aboutsummaryrefslogtreecommitdiffstats
path: root/io
diff options
context:
space:
mode:
Diffstat (limited to 'io')
-rw-r--r--io/channel-buffer.c251
-rw-r--r--io/channel-command.c392
-rw-r--r--io/channel-file.c234
-rw-r--r--io/channel-socket.c806
-rw-r--r--io/channel-tls.c434
-rw-r--r--io/channel-util.c38
-rw-r--r--io/channel-watch.c353
-rw-r--r--io/channel-websock.c1335
-rw-r--r--io/channel.c623
-rw-r--r--io/dns-resolver.c283
-rw-r--r--io/meson.build15
-rw-r--r--io/net-listener.c322
-rw-r--r--io/task.c241
-rw-r--r--io/trace-events65
-rw-r--r--io/trace.h1
15 files changed, 5393 insertions, 0 deletions
diff --git a/io/channel-buffer.c b/io/channel-buffer.c
new file mode 100644
index 000000000..baa4e2b08
--- /dev/null
+++ b/io/channel-buffer.c
@@ -0,0 +1,251 @@
+/*
+ * QEMU I/O channels memory buffer driver
+ *
+ * Copyright (c) 2015 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include "qemu/osdep.h"
+#include "io/channel-buffer.h"
+#include "io/channel-watch.h"
+#include "qemu/module.h"
+#include "qemu/sockets.h"
+#include "trace.h"
+
+QIOChannelBuffer *
+qio_channel_buffer_new(size_t capacity)
+{
+ QIOChannelBuffer *ioc;
+
+ ioc = QIO_CHANNEL_BUFFER(object_new(TYPE_QIO_CHANNEL_BUFFER));
+
+ if (capacity) {
+ ioc->data = g_new0(uint8_t, capacity);
+ ioc->capacity = capacity;
+ }
+
+ return ioc;
+}
+
+
+static void qio_channel_buffer_finalize(Object *obj)
+{
+ QIOChannelBuffer *ioc = QIO_CHANNEL_BUFFER(obj);
+ g_free(ioc->data);
+ ioc->capacity = ioc->usage = ioc->offset = 0;
+}
+
+
+static ssize_t qio_channel_buffer_readv(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int **fds,
+ size_t *nfds,
+ Error **errp)
+{
+ QIOChannelBuffer *bioc = QIO_CHANNEL_BUFFER(ioc);
+ ssize_t ret = 0;
+ size_t i;
+
+ for (i = 0; i < niov; i++) {
+ size_t want = iov[i].iov_len;
+ if (bioc->offset >= bioc->usage) {
+ break;
+ }
+ if ((bioc->offset + want) > bioc->usage) {
+ want = bioc->usage - bioc->offset;
+ }
+ memcpy(iov[i].iov_base, bioc->data + bioc->offset, want);
+ ret += want;
+ bioc->offset += want;
+ }
+
+ return ret;
+}
+
+static ssize_t qio_channel_buffer_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ Error **errp)
+{
+ QIOChannelBuffer *bioc = QIO_CHANNEL_BUFFER(ioc);
+ ssize_t ret = 0;
+ size_t i;
+ size_t towrite = 0;
+
+ for (i = 0; i < niov; i++) {
+ towrite += iov[i].iov_len;
+ }
+
+ if ((bioc->offset + towrite) > bioc->capacity) {
+ bioc->capacity = bioc->offset + towrite;
+ bioc->data = g_realloc(bioc->data, bioc->capacity);
+ }
+
+ if (bioc->offset > bioc->usage) {
+ memset(bioc->data, 0, bioc->offset - bioc->usage);
+ bioc->usage = bioc->offset;
+ }
+
+ for (i = 0; i < niov; i++) {
+ memcpy(bioc->data + bioc->usage,
+ iov[i].iov_base,
+ iov[i].iov_len);
+ bioc->usage += iov[i].iov_len;
+ bioc->offset += iov[i].iov_len;
+ ret += iov[i].iov_len;
+ }
+
+ return ret;
+}
+
+static int qio_channel_buffer_set_blocking(QIOChannel *ioc G_GNUC_UNUSED,
+ bool enabled G_GNUC_UNUSED,
+ Error **errp G_GNUC_UNUSED)
+{
+ return 0;
+}
+
+
+static off_t qio_channel_buffer_seek(QIOChannel *ioc,
+ off_t offset,
+ int whence,
+ Error **errp)
+{
+ QIOChannelBuffer *bioc = QIO_CHANNEL_BUFFER(ioc);
+
+ bioc->offset = offset;
+
+ return offset;
+}
+
+
+static int qio_channel_buffer_close(QIOChannel *ioc,
+ Error **errp)
+{
+ QIOChannelBuffer *bioc = QIO_CHANNEL_BUFFER(ioc);
+
+ g_free(bioc->data);
+ bioc->data = NULL;
+ bioc->capacity = bioc->usage = bioc->offset = 0;
+
+ return 0;
+}
+
+
+typedef struct QIOChannelBufferSource QIOChannelBufferSource;
+struct QIOChannelBufferSource {
+ GSource parent;
+ QIOChannelBuffer *bioc;
+ GIOCondition condition;
+};
+
+static gboolean
+qio_channel_buffer_source_prepare(GSource *source,
+ gint *timeout)
+{
+ QIOChannelBufferSource *bsource = (QIOChannelBufferSource *)source;
+
+ *timeout = -1;
+
+ return (G_IO_IN | G_IO_OUT) & bsource->condition;
+}
+
+static gboolean
+qio_channel_buffer_source_check(GSource *source)
+{
+ QIOChannelBufferSource *bsource = (QIOChannelBufferSource *)source;
+
+ return (G_IO_IN | G_IO_OUT) & bsource->condition;
+}
+
+static gboolean
+qio_channel_buffer_source_dispatch(GSource *source,
+ GSourceFunc callback,
+ gpointer user_data)
+{
+ QIOChannelFunc func = (QIOChannelFunc)callback;
+ QIOChannelBufferSource *bsource = (QIOChannelBufferSource *)source;
+
+ return (*func)(QIO_CHANNEL(bsource->bioc),
+ ((G_IO_IN | G_IO_OUT) & bsource->condition),
+ user_data);
+}
+
+static void
+qio_channel_buffer_source_finalize(GSource *source)
+{
+ QIOChannelBufferSource *ssource = (QIOChannelBufferSource *)source;
+
+ object_unref(OBJECT(ssource->bioc));
+}
+
+GSourceFuncs qio_channel_buffer_source_funcs = {
+ qio_channel_buffer_source_prepare,
+ qio_channel_buffer_source_check,
+ qio_channel_buffer_source_dispatch,
+ qio_channel_buffer_source_finalize
+};
+
+static GSource *qio_channel_buffer_create_watch(QIOChannel *ioc,
+ GIOCondition condition)
+{
+ QIOChannelBuffer *bioc = QIO_CHANNEL_BUFFER(ioc);
+ QIOChannelBufferSource *ssource;
+ GSource *source;
+
+ source = g_source_new(&qio_channel_buffer_source_funcs,
+ sizeof(QIOChannelBufferSource));
+ ssource = (QIOChannelBufferSource *)source;
+
+ ssource->bioc = bioc;
+ object_ref(OBJECT(bioc));
+
+ ssource->condition = condition;
+
+ return source;
+}
+
+
+static void qio_channel_buffer_class_init(ObjectClass *klass,
+ void *class_data G_GNUC_UNUSED)
+{
+ QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
+
+ ioc_klass->io_writev = qio_channel_buffer_writev;
+ ioc_klass->io_readv = qio_channel_buffer_readv;
+ ioc_klass->io_set_blocking = qio_channel_buffer_set_blocking;
+ ioc_klass->io_seek = qio_channel_buffer_seek;
+ ioc_klass->io_close = qio_channel_buffer_close;
+ ioc_klass->io_create_watch = qio_channel_buffer_create_watch;
+}
+
+static const TypeInfo qio_channel_buffer_info = {
+ .parent = TYPE_QIO_CHANNEL,
+ .name = TYPE_QIO_CHANNEL_BUFFER,
+ .instance_size = sizeof(QIOChannelBuffer),
+ .instance_finalize = qio_channel_buffer_finalize,
+ .class_init = qio_channel_buffer_class_init,
+};
+
+static void qio_channel_buffer_register_types(void)
+{
+ type_register_static(&qio_channel_buffer_info);
+}
+
+type_init(qio_channel_buffer_register_types);
diff --git a/io/channel-command.c b/io/channel-command.c
new file mode 100644
index 000000000..b2a9e2713
--- /dev/null
+++ b/io/channel-command.c
@@ -0,0 +1,392 @@
+/*
+ * QEMU I/O channels external command driver
+ *
+ * Copyright (c) 2015 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include "qemu/osdep.h"
+#include "io/channel-command.h"
+#include "io/channel-watch.h"
+#include "qapi/error.h"
+#include "qemu/module.h"
+#include "qemu/sockets.h"
+#include "trace.h"
+
+
+QIOChannelCommand *
+qio_channel_command_new_pid(int writefd,
+ int readfd,
+ pid_t pid)
+{
+ QIOChannelCommand *ioc;
+
+ ioc = QIO_CHANNEL_COMMAND(object_new(TYPE_QIO_CHANNEL_COMMAND));
+
+ ioc->readfd = readfd;
+ ioc->writefd = writefd;
+ ioc->pid = pid;
+
+ trace_qio_channel_command_new_pid(ioc, writefd, readfd, pid);
+ return ioc;
+}
+
+
+#ifndef WIN32
+QIOChannelCommand *
+qio_channel_command_new_spawn(const char *const argv[],
+ int flags,
+ Error **errp)
+{
+ pid_t pid = -1;
+ int stdinfd[2] = { -1, -1 };
+ int stdoutfd[2] = { -1, -1 };
+ int devnull = -1;
+ bool stdinnull = false, stdoutnull = false;
+ QIOChannelCommand *ioc;
+
+ flags = flags & O_ACCMODE;
+
+ if (flags == O_RDONLY) {
+ stdinnull = true;
+ }
+ if (flags == O_WRONLY) {
+ stdoutnull = true;
+ }
+
+ if (stdinnull || stdoutnull) {
+ devnull = open("/dev/null", O_RDWR);
+ if (devnull < 0) {
+ error_setg_errno(errp, errno,
+ "Unable to open /dev/null");
+ goto error;
+ }
+ }
+
+ if ((!stdinnull && pipe(stdinfd) < 0) ||
+ (!stdoutnull && pipe(stdoutfd) < 0)) {
+ error_setg_errno(errp, errno,
+ "Unable to open pipe");
+ goto error;
+ }
+
+ pid = qemu_fork(errp);
+ if (pid < 0) {
+ goto error;
+ }
+
+ if (pid == 0) { /* child */
+ dup2(stdinnull ? devnull : stdinfd[0], STDIN_FILENO);
+ dup2(stdoutnull ? devnull : stdoutfd[1], STDOUT_FILENO);
+ /* Leave stderr connected to qemu's stderr */
+
+ if (!stdinnull) {
+ close(stdinfd[0]);
+ close(stdinfd[1]);
+ }
+ if (!stdoutnull) {
+ close(stdoutfd[0]);
+ close(stdoutfd[1]);
+ }
+ if (devnull != -1) {
+ close(devnull);
+ }
+
+ execv(argv[0], (char * const *)argv);
+ _exit(1);
+ }
+
+ if (!stdinnull) {
+ close(stdinfd[0]);
+ }
+ if (!stdoutnull) {
+ close(stdoutfd[1]);
+ }
+
+ ioc = qio_channel_command_new_pid(stdinnull ? devnull : stdinfd[1],
+ stdoutnull ? devnull : stdoutfd[0],
+ pid);
+ trace_qio_channel_command_new_spawn(ioc, argv[0], flags);
+ return ioc;
+
+ error:
+ if (devnull != -1) {
+ close(devnull);
+ }
+ if (stdinfd[0] != -1) {
+ close(stdinfd[0]);
+ }
+ if (stdinfd[1] != -1) {
+ close(stdinfd[1]);
+ }
+ if (stdoutfd[0] != -1) {
+ close(stdoutfd[0]);
+ }
+ if (stdoutfd[1] != -1) {
+ close(stdoutfd[1]);
+ }
+ return NULL;
+}
+
+#else /* WIN32 */
+QIOChannelCommand *
+qio_channel_command_new_spawn(const char *const argv[],
+ int flags,
+ Error **errp)
+{
+ error_setg_errno(errp, ENOSYS,
+ "Command spawn not supported on this platform");
+ return NULL;
+}
+#endif /* WIN32 */
+
+#ifndef WIN32
+static int qio_channel_command_abort(QIOChannelCommand *ioc,
+ Error **errp)
+{
+ pid_t ret;
+ int status;
+ int step = 0;
+
+ /* See if intermediate process has exited; if not, try a nice
+ * SIGTERM followed by a more severe SIGKILL.
+ */
+ rewait:
+ trace_qio_channel_command_abort(ioc, ioc->pid);
+ ret = waitpid(ioc->pid, &status, WNOHANG);
+ trace_qio_channel_command_wait(ioc, ioc->pid, ret, status);
+ if (ret == (pid_t)-1) {
+ if (errno == EINTR) {
+ goto rewait;
+ } else {
+ error_setg_errno(errp, errno,
+ "Cannot wait on pid %llu",
+ (unsigned long long)ioc->pid);
+ return -1;
+ }
+ } else if (ret == 0) {
+ if (step == 0) {
+ kill(ioc->pid, SIGTERM);
+ } else if (step == 1) {
+ kill(ioc->pid, SIGKILL);
+ } else {
+ error_setg(errp,
+ "Process %llu refused to die",
+ (unsigned long long)ioc->pid);
+ return -1;
+ }
+ step++;
+ usleep(10 * 1000);
+ goto rewait;
+ }
+
+ return 0;
+}
+#endif /* ! WIN32 */
+
+
+static void qio_channel_command_init(Object *obj)
+{
+ QIOChannelCommand *ioc = QIO_CHANNEL_COMMAND(obj);
+ ioc->readfd = -1;
+ ioc->writefd = -1;
+ ioc->pid = -1;
+}
+
+static void qio_channel_command_finalize(Object *obj)
+{
+ QIOChannelCommand *ioc = QIO_CHANNEL_COMMAND(obj);
+ if (ioc->readfd != -1) {
+ close(ioc->readfd);
+ }
+ if (ioc->writefd != -1 &&
+ ioc->writefd != ioc->readfd) {
+ close(ioc->writefd);
+ }
+ ioc->writefd = ioc->readfd = -1;
+ if (ioc->pid > 0) {
+#ifndef WIN32
+ qio_channel_command_abort(ioc, NULL);
+#endif
+ }
+}
+
+
+static ssize_t qio_channel_command_readv(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int **fds,
+ size_t *nfds,
+ Error **errp)
+{
+ QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc);
+ ssize_t ret;
+
+ retry:
+ ret = readv(cioc->readfd, iov, niov);
+ if (ret < 0) {
+ if (errno == EAGAIN) {
+ return QIO_CHANNEL_ERR_BLOCK;
+ }
+ if (errno == EINTR) {
+ goto retry;
+ }
+
+ error_setg_errno(errp, errno,
+ "Unable to read from command");
+ return -1;
+ }
+
+ return ret;
+}
+
+static ssize_t qio_channel_command_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ Error **errp)
+{
+ QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc);
+ ssize_t ret;
+
+ retry:
+ ret = writev(cioc->writefd, iov, niov);
+ if (ret <= 0) {
+ if (errno == EAGAIN) {
+ return QIO_CHANNEL_ERR_BLOCK;
+ }
+ if (errno == EINTR) {
+ goto retry;
+ }
+ error_setg_errno(errp, errno, "%s",
+ "Unable to write to command");
+ return -1;
+ }
+ return ret;
+}
+
+static int qio_channel_command_set_blocking(QIOChannel *ioc,
+ bool enabled,
+ Error **errp)
+{
+ QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc);
+
+ if (enabled) {
+ qemu_set_block(cioc->writefd);
+ qemu_set_block(cioc->readfd);
+ } else {
+ qemu_set_nonblock(cioc->writefd);
+ qemu_set_nonblock(cioc->readfd);
+ }
+
+ return 0;
+}
+
+
+static int qio_channel_command_close(QIOChannel *ioc,
+ Error **errp)
+{
+ QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc);
+ int rv = 0;
+#ifndef WIN32
+ pid_t wp;
+#endif
+
+ /* We close FDs before killing, because that
+ * gives a better chance of clean shutdown
+ */
+ if (cioc->readfd != -1 &&
+ close(cioc->readfd) < 0) {
+ rv = -1;
+ }
+ if (cioc->writefd != -1 &&
+ cioc->writefd != cioc->readfd &&
+ close(cioc->writefd) < 0) {
+ rv = -1;
+ }
+ cioc->writefd = cioc->readfd = -1;
+
+#ifndef WIN32
+ do {
+ wp = waitpid(cioc->pid, NULL, 0);
+ } while (wp == (pid_t)-1 && errno == EINTR);
+ if (wp == (pid_t)-1) {
+ error_setg_errno(errp, errno, "Failed to wait for pid %llu",
+ (unsigned long long)cioc->pid);
+ return -1;
+ }
+#endif
+
+ if (rv < 0) {
+ error_setg_errno(errp, errno, "%s",
+ "Unable to close command");
+ }
+ return rv;
+}
+
+
+static void qio_channel_command_set_aio_fd_handler(QIOChannel *ioc,
+ AioContext *ctx,
+ IOHandler *io_read,
+ IOHandler *io_write,
+ void *opaque)
+{
+ QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc);
+ aio_set_fd_handler(ctx, cioc->readfd, false, io_read, NULL, NULL, opaque);
+ aio_set_fd_handler(ctx, cioc->writefd, false, NULL, io_write, NULL, opaque);
+}
+
+
+static GSource *qio_channel_command_create_watch(QIOChannel *ioc,
+ GIOCondition condition)
+{
+ QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc);
+ return qio_channel_create_fd_pair_watch(ioc,
+ cioc->readfd,
+ cioc->writefd,
+ condition);
+}
+
+
+static void qio_channel_command_class_init(ObjectClass *klass,
+ void *class_data G_GNUC_UNUSED)
+{
+ QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
+
+ ioc_klass->io_writev = qio_channel_command_writev;
+ ioc_klass->io_readv = qio_channel_command_readv;
+ ioc_klass->io_set_blocking = qio_channel_command_set_blocking;
+ ioc_klass->io_close = qio_channel_command_close;
+ ioc_klass->io_create_watch = qio_channel_command_create_watch;
+ ioc_klass->io_set_aio_fd_handler = qio_channel_command_set_aio_fd_handler;
+}
+
+static const TypeInfo qio_channel_command_info = {
+ .parent = TYPE_QIO_CHANNEL,
+ .name = TYPE_QIO_CHANNEL_COMMAND,
+ .instance_size = sizeof(QIOChannelCommand),
+ .instance_init = qio_channel_command_init,
+ .instance_finalize = qio_channel_command_finalize,
+ .class_init = qio_channel_command_class_init,
+};
+
+static void qio_channel_command_register_types(void)
+{
+ type_register_static(&qio_channel_command_info);
+}
+
+type_init(qio_channel_command_register_types);
diff --git a/io/channel-file.c b/io/channel-file.c
new file mode 100644
index 000000000..c4bf799a8
--- /dev/null
+++ b/io/channel-file.c
@@ -0,0 +1,234 @@
+/*
+ * QEMU I/O channels files driver
+ *
+ * Copyright (c) 2015 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include "qemu/osdep.h"
+#include "io/channel-file.h"
+#include "io/channel-watch.h"
+#include "qapi/error.h"
+#include "qemu/module.h"
+#include "qemu/sockets.h"
+#include "trace.h"
+
+QIOChannelFile *
+qio_channel_file_new_fd(int fd)
+{
+ QIOChannelFile *ioc;
+
+ ioc = QIO_CHANNEL_FILE(object_new(TYPE_QIO_CHANNEL_FILE));
+
+ ioc->fd = fd;
+
+ trace_qio_channel_file_new_fd(ioc, fd);
+
+ return ioc;
+}
+
+
+QIOChannelFile *
+qio_channel_file_new_path(const char *path,
+ int flags,
+ mode_t mode,
+ Error **errp)
+{
+ QIOChannelFile *ioc;
+
+ ioc = QIO_CHANNEL_FILE(object_new(TYPE_QIO_CHANNEL_FILE));
+
+ ioc->fd = qemu_open_old(path, flags, mode);
+ if (ioc->fd < 0) {
+ object_unref(OBJECT(ioc));
+ error_setg_errno(errp, errno,
+ "Unable to open %s", path);
+ return NULL;
+ }
+
+ trace_qio_channel_file_new_path(ioc, path, flags, mode, ioc->fd);
+
+ return ioc;
+}
+
+
+static void qio_channel_file_init(Object *obj)
+{
+ QIOChannelFile *ioc = QIO_CHANNEL_FILE(obj);
+ ioc->fd = -1;
+}
+
+static void qio_channel_file_finalize(Object *obj)
+{
+ QIOChannelFile *ioc = QIO_CHANNEL_FILE(obj);
+ if (ioc->fd != -1) {
+ qemu_close(ioc->fd);
+ ioc->fd = -1;
+ }
+}
+
+
+static ssize_t qio_channel_file_readv(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int **fds,
+ size_t *nfds,
+ Error **errp)
+{
+ QIOChannelFile *fioc = QIO_CHANNEL_FILE(ioc);
+ ssize_t ret;
+
+ retry:
+ ret = readv(fioc->fd, iov, niov);
+ if (ret < 0) {
+ if (errno == EAGAIN) {
+ return QIO_CHANNEL_ERR_BLOCK;
+ }
+ if (errno == EINTR) {
+ goto retry;
+ }
+
+ error_setg_errno(errp, errno,
+ "Unable to read from file");
+ return -1;
+ }
+
+ return ret;
+}
+
+static ssize_t qio_channel_file_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ Error **errp)
+{
+ QIOChannelFile *fioc = QIO_CHANNEL_FILE(ioc);
+ ssize_t ret;
+
+ retry:
+ ret = writev(fioc->fd, iov, niov);
+ if (ret <= 0) {
+ if (errno == EAGAIN) {
+ return QIO_CHANNEL_ERR_BLOCK;
+ }
+ if (errno == EINTR) {
+ goto retry;
+ }
+ error_setg_errno(errp, errno,
+ "Unable to write to file");
+ return -1;
+ }
+ return ret;
+}
+
+static int qio_channel_file_set_blocking(QIOChannel *ioc,
+ bool enabled,
+ Error **errp)
+{
+ QIOChannelFile *fioc = QIO_CHANNEL_FILE(ioc);
+
+ if (enabled) {
+ qemu_set_block(fioc->fd);
+ } else {
+ qemu_set_nonblock(fioc->fd);
+ }
+ return 0;
+}
+
+
+static off_t qio_channel_file_seek(QIOChannel *ioc,
+ off_t offset,
+ int whence,
+ Error **errp)
+{
+ QIOChannelFile *fioc = QIO_CHANNEL_FILE(ioc);
+ off_t ret;
+
+ ret = lseek(fioc->fd, offset, whence);
+ if (ret == (off_t)-1) {
+ error_setg_errno(errp, errno,
+ "Unable to seek to offset %lld whence %d in file",
+ (long long int)offset, whence);
+ return -1;
+ }
+ return ret;
+}
+
+
+static int qio_channel_file_close(QIOChannel *ioc,
+ Error **errp)
+{
+ QIOChannelFile *fioc = QIO_CHANNEL_FILE(ioc);
+
+ if (qemu_close(fioc->fd) < 0) {
+ error_setg_errno(errp, errno,
+ "Unable to close file");
+ return -1;
+ }
+ fioc->fd = -1;
+ return 0;
+}
+
+
+static void qio_channel_file_set_aio_fd_handler(QIOChannel *ioc,
+ AioContext *ctx,
+ IOHandler *io_read,
+ IOHandler *io_write,
+ void *opaque)
+{
+ QIOChannelFile *fioc = QIO_CHANNEL_FILE(ioc);
+ aio_set_fd_handler(ctx, fioc->fd, false, io_read, io_write, NULL, opaque);
+}
+
+static GSource *qio_channel_file_create_watch(QIOChannel *ioc,
+ GIOCondition condition)
+{
+ QIOChannelFile *fioc = QIO_CHANNEL_FILE(ioc);
+ return qio_channel_create_fd_watch(ioc,
+ fioc->fd,
+ condition);
+}
+
+static void qio_channel_file_class_init(ObjectClass *klass,
+ void *class_data G_GNUC_UNUSED)
+{
+ QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
+
+ ioc_klass->io_writev = qio_channel_file_writev;
+ ioc_klass->io_readv = qio_channel_file_readv;
+ ioc_klass->io_set_blocking = qio_channel_file_set_blocking;
+ ioc_klass->io_seek = qio_channel_file_seek;
+ ioc_klass->io_close = qio_channel_file_close;
+ ioc_klass->io_create_watch = qio_channel_file_create_watch;
+ ioc_klass->io_set_aio_fd_handler = qio_channel_file_set_aio_fd_handler;
+}
+
+static const TypeInfo qio_channel_file_info = {
+ .parent = TYPE_QIO_CHANNEL,
+ .name = TYPE_QIO_CHANNEL_FILE,
+ .instance_size = sizeof(QIOChannelFile),
+ .instance_init = qio_channel_file_init,
+ .instance_finalize = qio_channel_file_finalize,
+ .class_init = qio_channel_file_class_init,
+};
+
+static void qio_channel_file_register_types(void)
+{
+ type_register_static(&qio_channel_file_info);
+}
+
+type_init(qio_channel_file_register_types);
diff --git a/io/channel-socket.c b/io/channel-socket.c
new file mode 100644
index 000000000..606ec97cf
--- /dev/null
+++ b/io/channel-socket.c
@@ -0,0 +1,806 @@
+/*
+ * QEMU I/O channels sockets driver
+ *
+ * Copyright (c) 2015 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "qemu/osdep.h"
+#include "qemu-common.h"
+#include "qapi/error.h"
+#include "qapi/qapi-visit-sockets.h"
+#include "qemu/module.h"
+#include "io/channel-socket.h"
+#include "io/channel-watch.h"
+#include "trace.h"
+#include "qapi/clone-visitor.h"
+
+#define SOCKET_MAX_FDS 16
+
+SocketAddress *
+qio_channel_socket_get_local_address(QIOChannelSocket *ioc,
+ Error **errp)
+{
+ return socket_sockaddr_to_address(&ioc->localAddr,
+ ioc->localAddrLen,
+ errp);
+}
+
+SocketAddress *
+qio_channel_socket_get_remote_address(QIOChannelSocket *ioc,
+ Error **errp)
+{
+ return socket_sockaddr_to_address(&ioc->remoteAddr,
+ ioc->remoteAddrLen,
+ errp);
+}
+
+QIOChannelSocket *
+qio_channel_socket_new(void)
+{
+ QIOChannelSocket *sioc;
+ QIOChannel *ioc;
+
+ sioc = QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET));
+ sioc->fd = -1;
+
+ ioc = QIO_CHANNEL(sioc);
+ qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
+
+#ifdef WIN32
+ ioc->event = CreateEvent(NULL, FALSE, FALSE, NULL);
+#endif
+
+ trace_qio_channel_socket_new(sioc);
+
+ return sioc;
+}
+
+
+static int
+qio_channel_socket_set_fd(QIOChannelSocket *sioc,
+ int fd,
+ Error **errp)
+{
+ if (sioc->fd != -1) {
+ error_setg(errp, "Socket is already open");
+ return -1;
+ }
+
+ sioc->fd = fd;
+ sioc->remoteAddrLen = sizeof(sioc->remoteAddr);
+ sioc->localAddrLen = sizeof(sioc->localAddr);
+
+
+ if (getpeername(fd, (struct sockaddr *)&sioc->remoteAddr,
+ &sioc->remoteAddrLen) < 0) {
+ if (errno == ENOTCONN) {
+ memset(&sioc->remoteAddr, 0, sizeof(sioc->remoteAddr));
+ sioc->remoteAddrLen = sizeof(sioc->remoteAddr);
+ } else {
+ error_setg_errno(errp, errno,
+ "Unable to query remote socket address");
+ goto error;
+ }
+ }
+
+ if (getsockname(fd, (struct sockaddr *)&sioc->localAddr,
+ &sioc->localAddrLen) < 0) {
+ error_setg_errno(errp, errno,
+ "Unable to query local socket address");
+ goto error;
+ }
+
+#ifndef WIN32
+ if (sioc->localAddr.ss_family == AF_UNIX) {
+ QIOChannel *ioc = QIO_CHANNEL(sioc);
+ qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS);
+ }
+#endif /* WIN32 */
+
+ return 0;
+
+ error:
+ sioc->fd = -1; /* Let the caller close FD on failure */
+ return -1;
+}
+
+QIOChannelSocket *
+qio_channel_socket_new_fd(int fd,
+ Error **errp)
+{
+ QIOChannelSocket *ioc;
+
+ ioc = qio_channel_socket_new();
+ if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
+ object_unref(OBJECT(ioc));
+ return NULL;
+ }
+
+ trace_qio_channel_socket_new_fd(ioc, fd);
+
+ return ioc;
+}
+
+
+int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
+ SocketAddress *addr,
+ Error **errp)
+{
+ int fd;
+
+ trace_qio_channel_socket_connect_sync(ioc, addr);
+ fd = socket_connect(addr, errp);
+ if (fd < 0) {
+ trace_qio_channel_socket_connect_fail(ioc);
+ return -1;
+ }
+
+ trace_qio_channel_socket_connect_complete(ioc, fd);
+ if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
+ close(fd);
+ return -1;
+ }
+
+ return 0;
+}
+
+
+static void qio_channel_socket_connect_worker(QIOTask *task,
+ gpointer opaque)
+{
+ QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
+ SocketAddress *addr = opaque;
+ Error *err = NULL;
+
+ qio_channel_socket_connect_sync(ioc, addr, &err);
+
+ qio_task_set_error(task, err);
+}
+
+
+void qio_channel_socket_connect_async(QIOChannelSocket *ioc,
+ SocketAddress *addr,
+ QIOTaskFunc callback,
+ gpointer opaque,
+ GDestroyNotify destroy,
+ GMainContext *context)
+{
+ QIOTask *task = qio_task_new(
+ OBJECT(ioc), callback, opaque, destroy);
+ SocketAddress *addrCopy;
+
+ addrCopy = QAPI_CLONE(SocketAddress, addr);
+
+ /* socket_connect() does a non-blocking connect(), but it
+ * still blocks in DNS lookups, so we must use a thread */
+ trace_qio_channel_socket_connect_async(ioc, addr);
+ qio_task_run_in_thread(task,
+ qio_channel_socket_connect_worker,
+ addrCopy,
+ (GDestroyNotify)qapi_free_SocketAddress,
+ context);
+}
+
+
+int qio_channel_socket_listen_sync(QIOChannelSocket *ioc,
+ SocketAddress *addr,
+ int num,
+ Error **errp)
+{
+ int fd;
+
+ trace_qio_channel_socket_listen_sync(ioc, addr, num);
+ fd = socket_listen(addr, num, errp);
+ if (fd < 0) {
+ trace_qio_channel_socket_listen_fail(ioc);
+ return -1;
+ }
+
+ trace_qio_channel_socket_listen_complete(ioc, fd);
+ if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
+ close(fd);
+ return -1;
+ }
+ qio_channel_set_feature(QIO_CHANNEL(ioc), QIO_CHANNEL_FEATURE_LISTEN);
+
+ return 0;
+}
+
+
+struct QIOChannelListenWorkerData {
+ SocketAddress *addr;
+ int num; /* amount of expected connections */
+};
+
+static void qio_channel_listen_worker_free(gpointer opaque)
+{
+ struct QIOChannelListenWorkerData *data = opaque;
+
+ qapi_free_SocketAddress(data->addr);
+ g_free(data);
+}
+
+static void qio_channel_socket_listen_worker(QIOTask *task,
+ gpointer opaque)
+{
+ QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
+ struct QIOChannelListenWorkerData *data = opaque;
+ Error *err = NULL;
+
+ qio_channel_socket_listen_sync(ioc, data->addr, data->num, &err);
+
+ qio_task_set_error(task, err);
+}
+
+
+void qio_channel_socket_listen_async(QIOChannelSocket *ioc,
+ SocketAddress *addr,
+ int num,
+ QIOTaskFunc callback,
+ gpointer opaque,
+ GDestroyNotify destroy,
+ GMainContext *context)
+{
+ QIOTask *task = qio_task_new(
+ OBJECT(ioc), callback, opaque, destroy);
+ struct QIOChannelListenWorkerData *data;
+
+ data = g_new0(struct QIOChannelListenWorkerData, 1);
+ data->addr = QAPI_CLONE(SocketAddress, addr);
+ data->num = num;
+
+ /* socket_listen() blocks in DNS lookups, so we must use a thread */
+ trace_qio_channel_socket_listen_async(ioc, addr, num);
+ qio_task_run_in_thread(task,
+ qio_channel_socket_listen_worker,
+ data,
+ qio_channel_listen_worker_free,
+ context);
+}
+
+
+int qio_channel_socket_dgram_sync(QIOChannelSocket *ioc,
+ SocketAddress *localAddr,
+ SocketAddress *remoteAddr,
+ Error **errp)
+{
+ int fd;
+
+ trace_qio_channel_socket_dgram_sync(ioc, localAddr, remoteAddr);
+ fd = socket_dgram(remoteAddr, localAddr, errp);
+ if (fd < 0) {
+ trace_qio_channel_socket_dgram_fail(ioc);
+ return -1;
+ }
+
+ trace_qio_channel_socket_dgram_complete(ioc, fd);
+ if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
+ close(fd);
+ return -1;
+ }
+
+ return 0;
+}
+
+
+struct QIOChannelSocketDGramWorkerData {
+ SocketAddress *localAddr;
+ SocketAddress *remoteAddr;
+};
+
+
+static void qio_channel_socket_dgram_worker_free(gpointer opaque)
+{
+ struct QIOChannelSocketDGramWorkerData *data = opaque;
+ qapi_free_SocketAddress(data->localAddr);
+ qapi_free_SocketAddress(data->remoteAddr);
+ g_free(data);
+}
+
+static void qio_channel_socket_dgram_worker(QIOTask *task,
+ gpointer opaque)
+{
+ QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
+ struct QIOChannelSocketDGramWorkerData *data = opaque;
+ Error *err = NULL;
+
+ /* socket_dgram() blocks in DNS lookups, so we must use a thread */
+ qio_channel_socket_dgram_sync(ioc, data->localAddr,
+ data->remoteAddr, &err);
+
+ qio_task_set_error(task, err);
+}
+
+
+void qio_channel_socket_dgram_async(QIOChannelSocket *ioc,
+ SocketAddress *localAddr,
+ SocketAddress *remoteAddr,
+ QIOTaskFunc callback,
+ gpointer opaque,
+ GDestroyNotify destroy,
+ GMainContext *context)
+{
+ QIOTask *task = qio_task_new(
+ OBJECT(ioc), callback, opaque, destroy);
+ struct QIOChannelSocketDGramWorkerData *data = g_new0(
+ struct QIOChannelSocketDGramWorkerData, 1);
+
+ data->localAddr = QAPI_CLONE(SocketAddress, localAddr);
+ data->remoteAddr = QAPI_CLONE(SocketAddress, remoteAddr);
+
+ trace_qio_channel_socket_dgram_async(ioc, localAddr, remoteAddr);
+ qio_task_run_in_thread(task,
+ qio_channel_socket_dgram_worker,
+ data,
+ qio_channel_socket_dgram_worker_free,
+ context);
+}
+
+
+QIOChannelSocket *
+qio_channel_socket_accept(QIOChannelSocket *ioc,
+ Error **errp)
+{
+ QIOChannelSocket *cioc;
+
+ cioc = qio_channel_socket_new();
+ cioc->remoteAddrLen = sizeof(ioc->remoteAddr);
+ cioc->localAddrLen = sizeof(ioc->localAddr);
+
+ retry:
+ trace_qio_channel_socket_accept(ioc);
+ cioc->fd = qemu_accept(ioc->fd, (struct sockaddr *)&cioc->remoteAddr,
+ &cioc->remoteAddrLen);
+ if (cioc->fd < 0) {
+ if (errno == EINTR) {
+ goto retry;
+ }
+ error_setg_errno(errp, errno, "Unable to accept connection");
+ trace_qio_channel_socket_accept_fail(ioc);
+ goto error;
+ }
+
+ if (getsockname(cioc->fd, (struct sockaddr *)&cioc->localAddr,
+ &cioc->localAddrLen) < 0) {
+ error_setg_errno(errp, errno,
+ "Unable to query local socket address");
+ goto error;
+ }
+
+#ifndef WIN32
+ if (cioc->localAddr.ss_family == AF_UNIX) {
+ QIOChannel *ioc_local = QIO_CHANNEL(cioc);
+ qio_channel_set_feature(ioc_local, QIO_CHANNEL_FEATURE_FD_PASS);
+ }
+#endif /* WIN32 */
+
+ trace_qio_channel_socket_accept_complete(ioc, cioc, cioc->fd);
+ return cioc;
+
+ error:
+ object_unref(OBJECT(cioc));
+ return NULL;
+}
+
+static void qio_channel_socket_init(Object *obj)
+{
+ QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj);
+ ioc->fd = -1;
+}
+
+static void qio_channel_socket_finalize(Object *obj)
+{
+ QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj);
+
+ if (ioc->fd != -1) {
+ QIOChannel *ioc_local = QIO_CHANNEL(ioc);
+ if (qio_channel_has_feature(ioc_local, QIO_CHANNEL_FEATURE_LISTEN)) {
+ Error *err = NULL;
+
+ socket_listen_cleanup(ioc->fd, &err);
+ if (err) {
+ error_report_err(err);
+ err = NULL;
+ }
+ }
+#ifdef WIN32
+ WSAEventSelect(ioc->fd, NULL, 0);
+#endif
+ closesocket(ioc->fd);
+ ioc->fd = -1;
+ }
+}
+
+
+#ifndef WIN32
+static void qio_channel_socket_copy_fds(struct msghdr *msg,
+ int **fds, size_t *nfds)
+{
+ struct cmsghdr *cmsg;
+
+ *nfds = 0;
+ *fds = NULL;
+
+ for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
+ int fd_size, i;
+ int gotfds;
+
+ if (cmsg->cmsg_len < CMSG_LEN(sizeof(int)) ||
+ cmsg->cmsg_level != SOL_SOCKET ||
+ cmsg->cmsg_type != SCM_RIGHTS) {
+ continue;
+ }
+
+ fd_size = cmsg->cmsg_len - CMSG_LEN(0);
+
+ if (!fd_size) {
+ continue;
+ }
+
+ gotfds = fd_size / sizeof(int);
+ *fds = g_renew(int, *fds, *nfds + gotfds);
+ memcpy(*fds + *nfds, CMSG_DATA(cmsg), fd_size);
+
+ for (i = 0; i < gotfds; i++) {
+ int fd = (*fds)[*nfds + i];
+ if (fd < 0) {
+ continue;
+ }
+
+ /* O_NONBLOCK is preserved across SCM_RIGHTS so reset it */
+ qemu_set_block(fd);
+
+#ifndef MSG_CMSG_CLOEXEC
+ qemu_set_cloexec(fd);
+#endif
+ }
+ *nfds += gotfds;
+ }
+}
+
+
+static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int **fds,
+ size_t *nfds,
+ Error **errp)
+{
+ QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+ ssize_t ret;
+ struct msghdr msg = { NULL, };
+ char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
+ int sflags = 0;
+
+ memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
+
+ msg.msg_iov = (struct iovec *)iov;
+ msg.msg_iovlen = niov;
+ if (fds && nfds) {
+ msg.msg_control = control;
+ msg.msg_controllen = sizeof(control);
+#ifdef MSG_CMSG_CLOEXEC
+ sflags |= MSG_CMSG_CLOEXEC;
+#endif
+
+ }
+
+ retry:
+ ret = recvmsg(sioc->fd, &msg, sflags);
+ if (ret < 0) {
+ if (errno == EAGAIN) {
+ return QIO_CHANNEL_ERR_BLOCK;
+ }
+ if (errno == EINTR) {
+ goto retry;
+ }
+
+ error_setg_errno(errp, errno,
+ "Unable to read from socket");
+ return -1;
+ }
+
+ if (fds && nfds) {
+ qio_channel_socket_copy_fds(&msg, fds, nfds);
+ }
+
+ return ret;
+}
+
+static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ Error **errp)
+{
+ QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+ ssize_t ret;
+ struct msghdr msg = { NULL, };
+ char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
+ size_t fdsize = sizeof(int) * nfds;
+ struct cmsghdr *cmsg;
+
+ memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
+
+ msg.msg_iov = (struct iovec *)iov;
+ msg.msg_iovlen = niov;
+
+ if (nfds) {
+ if (nfds > SOCKET_MAX_FDS) {
+ error_setg_errno(errp, EINVAL,
+ "Only %d FDs can be sent, got %zu",
+ SOCKET_MAX_FDS, nfds);
+ return -1;
+ }
+
+ msg.msg_control = control;
+ msg.msg_controllen = CMSG_SPACE(sizeof(int) * nfds);
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_len = CMSG_LEN(fdsize);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ memcpy(CMSG_DATA(cmsg), fds, fdsize);
+ }
+
+ retry:
+ ret = sendmsg(sioc->fd, &msg, 0);
+ if (ret <= 0) {
+ if (errno == EAGAIN) {
+ return QIO_CHANNEL_ERR_BLOCK;
+ }
+ if (errno == EINTR) {
+ goto retry;
+ }
+ error_setg_errno(errp, errno,
+ "Unable to write to socket");
+ return -1;
+ }
+ return ret;
+}
+#else /* WIN32 */
+static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int **fds,
+ size_t *nfds,
+ Error **errp)
+{
+ QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+ ssize_t done = 0;
+ ssize_t i;
+
+ for (i = 0; i < niov; i++) {
+ ssize_t ret;
+ retry:
+ ret = recv(sioc->fd,
+ iov[i].iov_base,
+ iov[i].iov_len,
+ 0);
+ if (ret < 0) {
+ if (errno == EAGAIN) {
+ if (done) {
+ return done;
+ } else {
+ return QIO_CHANNEL_ERR_BLOCK;
+ }
+ } else if (errno == EINTR) {
+ goto retry;
+ } else {
+ error_setg_errno(errp, errno,
+ "Unable to read from socket");
+ return -1;
+ }
+ }
+ done += ret;
+ if (ret < iov[i].iov_len) {
+ return done;
+ }
+ }
+
+ return done;
+}
+
+static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ Error **errp)
+{
+ QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+ ssize_t done = 0;
+ ssize_t i;
+
+ for (i = 0; i < niov; i++) {
+ ssize_t ret;
+ retry:
+ ret = send(sioc->fd,
+ iov[i].iov_base,
+ iov[i].iov_len,
+ 0);
+ if (ret < 0) {
+ if (errno == EAGAIN) {
+ if (done) {
+ return done;
+ } else {
+ return QIO_CHANNEL_ERR_BLOCK;
+ }
+ } else if (errno == EINTR) {
+ goto retry;
+ } else {
+ error_setg_errno(errp, errno,
+ "Unable to write to socket");
+ return -1;
+ }
+ }
+ done += ret;
+ if (ret < iov[i].iov_len) {
+ return done;
+ }
+ }
+
+ return done;
+}
+#endif /* WIN32 */
+
+static int
+qio_channel_socket_set_blocking(QIOChannel *ioc,
+ bool enabled,
+ Error **errp)
+{
+ QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+
+ if (enabled) {
+ qemu_set_block(sioc->fd);
+ } else {
+ qemu_set_nonblock(sioc->fd);
+ }
+ return 0;
+}
+
+
+static void
+qio_channel_socket_set_delay(QIOChannel *ioc,
+ bool enabled)
+{
+ QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+ int v = enabled ? 0 : 1;
+
+ qemu_setsockopt(sioc->fd,
+ IPPROTO_TCP, TCP_NODELAY,
+ &v, sizeof(v));
+}
+
+
+static void
+qio_channel_socket_set_cork(QIOChannel *ioc,
+ bool enabled)
+{
+ QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+ int v = enabled ? 1 : 0;
+
+ socket_set_cork(sioc->fd, v);
+}
+
+
+static int
+qio_channel_socket_close(QIOChannel *ioc,
+ Error **errp)
+{
+ QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+ int rc = 0;
+ Error *err = NULL;
+
+ if (sioc->fd != -1) {
+#ifdef WIN32
+ WSAEventSelect(sioc->fd, NULL, 0);
+#endif
+ if (qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_LISTEN)) {
+ socket_listen_cleanup(sioc->fd, errp);
+ }
+
+ if (closesocket(sioc->fd) < 0) {
+ sioc->fd = -1;
+ error_setg_errno(&err, errno, "Unable to close socket");
+ error_propagate(errp, err);
+ return -1;
+ }
+ sioc->fd = -1;
+ }
+ return rc;
+}
+
+static int
+qio_channel_socket_shutdown(QIOChannel *ioc,
+ QIOChannelShutdown how,
+ Error **errp)
+{
+ QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+ int sockhow;
+
+ switch (how) {
+ case QIO_CHANNEL_SHUTDOWN_READ:
+ sockhow = SHUT_RD;
+ break;
+ case QIO_CHANNEL_SHUTDOWN_WRITE:
+ sockhow = SHUT_WR;
+ break;
+ case QIO_CHANNEL_SHUTDOWN_BOTH:
+ default:
+ sockhow = SHUT_RDWR;
+ break;
+ }
+
+ if (shutdown(sioc->fd, sockhow) < 0) {
+ error_setg_errno(errp, errno,
+ "Unable to shutdown socket");
+ return -1;
+ }
+ return 0;
+}
+
+static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc,
+ AioContext *ctx,
+ IOHandler *io_read,
+ IOHandler *io_write,
+ void *opaque)
+{
+ QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+ aio_set_fd_handler(ctx, sioc->fd, false, io_read, io_write, NULL, opaque);
+}
+
+static GSource *qio_channel_socket_create_watch(QIOChannel *ioc,
+ GIOCondition condition)
+{
+ QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+ return qio_channel_create_socket_watch(ioc,
+ sioc->fd,
+ condition);
+}
+
+static void qio_channel_socket_class_init(ObjectClass *klass,
+ void *class_data G_GNUC_UNUSED)
+{
+ QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
+
+ ioc_klass->io_writev = qio_channel_socket_writev;
+ ioc_klass->io_readv = qio_channel_socket_readv;
+ ioc_klass->io_set_blocking = qio_channel_socket_set_blocking;
+ ioc_klass->io_close = qio_channel_socket_close;
+ ioc_klass->io_shutdown = qio_channel_socket_shutdown;
+ ioc_klass->io_set_cork = qio_channel_socket_set_cork;
+ ioc_klass->io_set_delay = qio_channel_socket_set_delay;
+ ioc_klass->io_create_watch = qio_channel_socket_create_watch;
+ ioc_klass->io_set_aio_fd_handler = qio_channel_socket_set_aio_fd_handler;
+}
+
+static const TypeInfo qio_channel_socket_info = {
+ .parent = TYPE_QIO_CHANNEL,
+ .name = TYPE_QIO_CHANNEL_SOCKET,
+ .instance_size = sizeof(QIOChannelSocket),
+ .instance_init = qio_channel_socket_init,
+ .instance_finalize = qio_channel_socket_finalize,
+ .class_init = qio_channel_socket_class_init,
+};
+
+static void qio_channel_socket_register_types(void)
+{
+ type_register_static(&qio_channel_socket_info);
+}
+
+type_init(qio_channel_socket_register_types);
diff --git a/io/channel-tls.c b/io/channel-tls.c
new file mode 100644
index 000000000..2ae1b92fc
--- /dev/null
+++ b/io/channel-tls.c
@@ -0,0 +1,434 @@
+/*
+ * QEMU I/O channels TLS driver
+ *
+ * Copyright (c) 2015 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include "qemu/osdep.h"
+#include "qapi/error.h"
+#include "qemu/module.h"
+#include "io/channel-tls.h"
+#include "trace.h"
+#include "qemu/atomic.h"
+
+
+static ssize_t qio_channel_tls_write_handler(const char *buf,
+ size_t len,
+ void *opaque)
+{
+ QIOChannelTLS *tioc = QIO_CHANNEL_TLS(opaque);
+ ssize_t ret;
+
+ ret = qio_channel_write(tioc->master, buf, len, NULL);
+ if (ret == QIO_CHANNEL_ERR_BLOCK) {
+ errno = EAGAIN;
+ return -1;
+ } else if (ret < 0) {
+ errno = EIO;
+ return -1;
+ }
+ return ret;
+}
+
+static ssize_t qio_channel_tls_read_handler(char *buf,
+ size_t len,
+ void *opaque)
+{
+ QIOChannelTLS *tioc = QIO_CHANNEL_TLS(opaque);
+ ssize_t ret;
+
+ ret = qio_channel_read(tioc->master, buf, len, NULL);
+ if (ret == QIO_CHANNEL_ERR_BLOCK) {
+ errno = EAGAIN;
+ return -1;
+ } else if (ret < 0) {
+ errno = EIO;
+ return -1;
+ }
+ return ret;
+}
+
+
+QIOChannelTLS *
+qio_channel_tls_new_server(QIOChannel *master,
+ QCryptoTLSCreds *creds,
+ const char *aclname,
+ Error **errp)
+{
+ QIOChannelTLS *ioc;
+
+ ioc = QIO_CHANNEL_TLS(object_new(TYPE_QIO_CHANNEL_TLS));
+
+ ioc->master = master;
+ object_ref(OBJECT(master));
+
+ ioc->session = qcrypto_tls_session_new(
+ creds,
+ NULL,
+ aclname,
+ QCRYPTO_TLS_CREDS_ENDPOINT_SERVER,
+ errp);
+ if (!ioc->session) {
+ goto error;
+ }
+
+ qcrypto_tls_session_set_callbacks(
+ ioc->session,
+ qio_channel_tls_write_handler,
+ qio_channel_tls_read_handler,
+ ioc);
+
+ trace_qio_channel_tls_new_server(ioc, master, creds, aclname);
+ return ioc;
+
+ error:
+ object_unref(OBJECT(ioc));
+ return NULL;
+}
+
+QIOChannelTLS *
+qio_channel_tls_new_client(QIOChannel *master,
+ QCryptoTLSCreds *creds,
+ const char *hostname,
+ Error **errp)
+{
+ QIOChannelTLS *tioc;
+ QIOChannel *ioc;
+
+ tioc = QIO_CHANNEL_TLS(object_new(TYPE_QIO_CHANNEL_TLS));
+ ioc = QIO_CHANNEL(tioc);
+
+ tioc->master = master;
+ if (qio_channel_has_feature(master, QIO_CHANNEL_FEATURE_SHUTDOWN)) {
+ qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
+ }
+ object_ref(OBJECT(master));
+
+ tioc->session = qcrypto_tls_session_new(
+ creds,
+ hostname,
+ NULL,
+ QCRYPTO_TLS_CREDS_ENDPOINT_CLIENT,
+ errp);
+ if (!tioc->session) {
+ goto error;
+ }
+
+ qcrypto_tls_session_set_callbacks(
+ tioc->session,
+ qio_channel_tls_write_handler,
+ qio_channel_tls_read_handler,
+ tioc);
+
+ trace_qio_channel_tls_new_client(tioc, master, creds, hostname);
+ return tioc;
+
+ error:
+ object_unref(OBJECT(tioc));
+ return NULL;
+}
+
+struct QIOChannelTLSData {
+ QIOTask *task;
+ GMainContext *context;
+};
+typedef struct QIOChannelTLSData QIOChannelTLSData;
+
+static gboolean qio_channel_tls_handshake_io(QIOChannel *ioc,
+ GIOCondition condition,
+ gpointer user_data);
+
+static void qio_channel_tls_handshake_task(QIOChannelTLS *ioc,
+ QIOTask *task,
+ GMainContext *context)
+{
+ Error *err = NULL;
+ QCryptoTLSSessionHandshakeStatus status;
+
+ if (qcrypto_tls_session_handshake(ioc->session, &err) < 0) {
+ trace_qio_channel_tls_handshake_fail(ioc);
+ qio_task_set_error(task, err);
+ qio_task_complete(task);
+ return;
+ }
+
+ status = qcrypto_tls_session_get_handshake_status(ioc->session);
+ if (status == QCRYPTO_TLS_HANDSHAKE_COMPLETE) {
+ trace_qio_channel_tls_handshake_complete(ioc);
+ if (qcrypto_tls_session_check_credentials(ioc->session,
+ &err) < 0) {
+ trace_qio_channel_tls_credentials_deny(ioc);
+ qio_task_set_error(task, err);
+ } else {
+ trace_qio_channel_tls_credentials_allow(ioc);
+ }
+ qio_task_complete(task);
+ } else {
+ GIOCondition condition;
+ QIOChannelTLSData *data = g_new0(typeof(*data), 1);
+
+ data->task = task;
+ data->context = context;
+
+ if (context) {
+ g_main_context_ref(context);
+ }
+
+ if (status == QCRYPTO_TLS_HANDSHAKE_SENDING) {
+ condition = G_IO_OUT;
+ } else {
+ condition = G_IO_IN;
+ }
+
+ trace_qio_channel_tls_handshake_pending(ioc, status);
+ qio_channel_add_watch_full(ioc->master,
+ condition,
+ qio_channel_tls_handshake_io,
+ data,
+ NULL,
+ context);
+ }
+}
+
+
+static gboolean qio_channel_tls_handshake_io(QIOChannel *ioc,
+ GIOCondition condition,
+ gpointer user_data)
+{
+ QIOChannelTLSData *data = user_data;
+ QIOTask *task = data->task;
+ GMainContext *context = data->context;
+ QIOChannelTLS *tioc = QIO_CHANNEL_TLS(
+ qio_task_get_source(task));
+
+ g_free(data);
+ qio_channel_tls_handshake_task(tioc, task, context);
+
+ if (context) {
+ g_main_context_unref(context);
+ }
+
+ return FALSE;
+}
+
+void qio_channel_tls_handshake(QIOChannelTLS *ioc,
+ QIOTaskFunc func,
+ gpointer opaque,
+ GDestroyNotify destroy,
+ GMainContext *context)
+{
+ QIOTask *task;
+
+ task = qio_task_new(OBJECT(ioc),
+ func, opaque, destroy);
+
+ trace_qio_channel_tls_handshake_start(ioc);
+ qio_channel_tls_handshake_task(ioc, task, context);
+}
+
+
+static void qio_channel_tls_init(Object *obj G_GNUC_UNUSED)
+{
+}
+
+
+static void qio_channel_tls_finalize(Object *obj)
+{
+ QIOChannelTLS *ioc = QIO_CHANNEL_TLS(obj);
+
+ object_unref(OBJECT(ioc->master));
+ qcrypto_tls_session_free(ioc->session);
+}
+
+
+static ssize_t qio_channel_tls_readv(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int **fds,
+ size_t *nfds,
+ Error **errp)
+{
+ QIOChannelTLS *tioc = QIO_CHANNEL_TLS(ioc);
+ size_t i;
+ ssize_t got = 0;
+
+ for (i = 0 ; i < niov ; i++) {
+ ssize_t ret = qcrypto_tls_session_read(tioc->session,
+ iov[i].iov_base,
+ iov[i].iov_len);
+ if (ret < 0) {
+ if (errno == EAGAIN) {
+ if (got) {
+ return got;
+ } else {
+ return QIO_CHANNEL_ERR_BLOCK;
+ }
+ } else if (errno == ECONNABORTED &&
+ (qatomic_load_acquire(&tioc->shutdown) &
+ QIO_CHANNEL_SHUTDOWN_READ)) {
+ return 0;
+ }
+
+ error_setg_errno(errp, errno,
+ "Cannot read from TLS channel");
+ return -1;
+ }
+ got += ret;
+ if (ret < iov[i].iov_len) {
+ break;
+ }
+ }
+ return got;
+}
+
+
+static ssize_t qio_channel_tls_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ Error **errp)
+{
+ QIOChannelTLS *tioc = QIO_CHANNEL_TLS(ioc);
+ size_t i;
+ ssize_t done = 0;
+
+ for (i = 0 ; i < niov ; i++) {
+ ssize_t ret = qcrypto_tls_session_write(tioc->session,
+ iov[i].iov_base,
+ iov[i].iov_len);
+ if (ret <= 0) {
+ if (errno == EAGAIN) {
+ if (done) {
+ return done;
+ } else {
+ return QIO_CHANNEL_ERR_BLOCK;
+ }
+ }
+
+ error_setg_errno(errp, errno,
+ "Cannot write to TLS channel");
+ return -1;
+ }
+ done += ret;
+ if (ret < iov[i].iov_len) {
+ break;
+ }
+ }
+ return done;
+}
+
+static int qio_channel_tls_set_blocking(QIOChannel *ioc,
+ bool enabled,
+ Error **errp)
+{
+ QIOChannelTLS *tioc = QIO_CHANNEL_TLS(ioc);
+
+ return qio_channel_set_blocking(tioc->master, enabled, errp);
+}
+
+static void qio_channel_tls_set_delay(QIOChannel *ioc,
+ bool enabled)
+{
+ QIOChannelTLS *tioc = QIO_CHANNEL_TLS(ioc);
+
+ qio_channel_set_delay(tioc->master, enabled);
+}
+
+static void qio_channel_tls_set_cork(QIOChannel *ioc,
+ bool enabled)
+{
+ QIOChannelTLS *tioc = QIO_CHANNEL_TLS(ioc);
+
+ qio_channel_set_cork(tioc->master, enabled);
+}
+
+static int qio_channel_tls_shutdown(QIOChannel *ioc,
+ QIOChannelShutdown how,
+ Error **errp)
+{
+ QIOChannelTLS *tioc = QIO_CHANNEL_TLS(ioc);
+
+ qatomic_or(&tioc->shutdown, how);
+
+ return qio_channel_shutdown(tioc->master, how, errp);
+}
+
+static int qio_channel_tls_close(QIOChannel *ioc,
+ Error **errp)
+{
+ QIOChannelTLS *tioc = QIO_CHANNEL_TLS(ioc);
+
+ return qio_channel_close(tioc->master, errp);
+}
+
+static void qio_channel_tls_set_aio_fd_handler(QIOChannel *ioc,
+ AioContext *ctx,
+ IOHandler *io_read,
+ IOHandler *io_write,
+ void *opaque)
+{
+ QIOChannelTLS *tioc = QIO_CHANNEL_TLS(ioc);
+
+ qio_channel_set_aio_fd_handler(tioc->master, ctx, io_read, io_write, opaque);
+}
+
+static GSource *qio_channel_tls_create_watch(QIOChannel *ioc,
+ GIOCondition condition)
+{
+ QIOChannelTLS *tioc = QIO_CHANNEL_TLS(ioc);
+
+ return qio_channel_create_watch(tioc->master, condition);
+}
+
+QCryptoTLSSession *
+qio_channel_tls_get_session(QIOChannelTLS *ioc)
+{
+ return ioc->session;
+}
+
+static void qio_channel_tls_class_init(ObjectClass *klass,
+ void *class_data G_GNUC_UNUSED)
+{
+ QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
+
+ ioc_klass->io_writev = qio_channel_tls_writev;
+ ioc_klass->io_readv = qio_channel_tls_readv;
+ ioc_klass->io_set_blocking = qio_channel_tls_set_blocking;
+ ioc_klass->io_set_delay = qio_channel_tls_set_delay;
+ ioc_klass->io_set_cork = qio_channel_tls_set_cork;
+ ioc_klass->io_close = qio_channel_tls_close;
+ ioc_klass->io_shutdown = qio_channel_tls_shutdown;
+ ioc_klass->io_create_watch = qio_channel_tls_create_watch;
+ ioc_klass->io_set_aio_fd_handler = qio_channel_tls_set_aio_fd_handler;
+}
+
+static const TypeInfo qio_channel_tls_info = {
+ .parent = TYPE_QIO_CHANNEL,
+ .name = TYPE_QIO_CHANNEL_TLS,
+ .instance_size = sizeof(QIOChannelTLS),
+ .instance_init = qio_channel_tls_init,
+ .instance_finalize = qio_channel_tls_finalize,
+ .class_init = qio_channel_tls_class_init,
+};
+
+static void qio_channel_tls_register_types(void)
+{
+ type_register_static(&qio_channel_tls_info);
+}
+
+type_init(qio_channel_tls_register_types);
diff --git a/io/channel-util.c b/io/channel-util.c
new file mode 100644
index 000000000..848a7a43d
--- /dev/null
+++ b/io/channel-util.c
@@ -0,0 +1,38 @@
+/*
+ * QEMU I/O channels utility APIs
+ *
+ * Copyright (c) 2016 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include "qemu/osdep.h"
+#include "io/channel-util.h"
+#include "io/channel-file.h"
+#include "io/channel-socket.h"
+
+
+QIOChannel *qio_channel_new_fd(int fd,
+ Error **errp)
+{
+ QIOChannel *ioc;
+
+ if (fd_is_socket(fd)) {
+ ioc = QIO_CHANNEL(qio_channel_socket_new_fd(fd, errp));
+ } else {
+ ioc = QIO_CHANNEL(qio_channel_file_new_fd(fd));
+ }
+ return ioc;
+}
diff --git a/io/channel-watch.c b/io/channel-watch.c
new file mode 100644
index 000000000..0289b3647
--- /dev/null
+++ b/io/channel-watch.c
@@ -0,0 +1,353 @@
+/*
+ * QEMU I/O channels watch helper APIs
+ *
+ * Copyright (c) 2015 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include "qemu/osdep.h"
+#include "io/channel-watch.h"
+
+typedef struct QIOChannelFDSource QIOChannelFDSource;
+struct QIOChannelFDSource {
+ GSource parent;
+ GPollFD fd;
+ QIOChannel *ioc;
+ GIOCondition condition;
+};
+
+
+#ifdef CONFIG_WIN32
+typedef struct QIOChannelSocketSource QIOChannelSocketSource;
+struct QIOChannelSocketSource {
+ GSource parent;
+ GPollFD fd;
+ QIOChannel *ioc;
+ SOCKET socket;
+ int revents;
+ GIOCondition condition;
+};
+
+#endif
+
+
+typedef struct QIOChannelFDPairSource QIOChannelFDPairSource;
+struct QIOChannelFDPairSource {
+ GSource parent;
+ GPollFD fdread;
+ GPollFD fdwrite;
+ QIOChannel *ioc;
+ GIOCondition condition;
+};
+
+
+static gboolean
+qio_channel_fd_source_prepare(GSource *source G_GNUC_UNUSED,
+ gint *timeout)
+{
+ *timeout = -1;
+
+ return FALSE;
+}
+
+
+static gboolean
+qio_channel_fd_source_check(GSource *source)
+{
+ QIOChannelFDSource *ssource = (QIOChannelFDSource *)source;
+
+ return ssource->fd.revents & ssource->condition;
+}
+
+
+static gboolean
+qio_channel_fd_source_dispatch(GSource *source,
+ GSourceFunc callback,
+ gpointer user_data)
+{
+ QIOChannelFunc func = (QIOChannelFunc)callback;
+ QIOChannelFDSource *ssource = (QIOChannelFDSource *)source;
+
+ return (*func)(ssource->ioc,
+ ssource->fd.revents & ssource->condition,
+ user_data);
+}
+
+
+static void
+qio_channel_fd_source_finalize(GSource *source)
+{
+ QIOChannelFDSource *ssource = (QIOChannelFDSource *)source;
+
+ object_unref(OBJECT(ssource->ioc));
+}
+
+
+#ifdef CONFIG_WIN32
+static gboolean
+qio_channel_socket_source_prepare(GSource *source G_GNUC_UNUSED,
+ gint *timeout)
+{
+ *timeout = -1;
+
+ return FALSE;
+}
+
+
+/*
+ * NB, this impl only works when the socket is in non-blocking
+ * mode on Win32
+ */
+static gboolean
+qio_channel_socket_source_check(GSource *source)
+{
+ static struct timeval tv0;
+
+ QIOChannelSocketSource *ssource = (QIOChannelSocketSource *)source;
+ WSANETWORKEVENTS ev;
+ fd_set rfds, wfds, xfds;
+
+ if (!ssource->condition) {
+ return 0;
+ }
+
+ WSAEnumNetworkEvents(ssource->socket, ssource->ioc->event, &ev);
+
+ FD_ZERO(&rfds);
+ FD_ZERO(&wfds);
+ FD_ZERO(&xfds);
+ if (ssource->condition & G_IO_IN) {
+ FD_SET((SOCKET)ssource->socket, &rfds);
+ }
+ if (ssource->condition & G_IO_OUT) {
+ FD_SET((SOCKET)ssource->socket, &wfds);
+ }
+ if (ssource->condition & G_IO_PRI) {
+ FD_SET((SOCKET)ssource->socket, &xfds);
+ }
+ ssource->revents = 0;
+ if (select(0, &rfds, &wfds, &xfds, &tv0) == 0) {
+ return 0;
+ }
+
+ if (FD_ISSET(ssource->socket, &rfds)) {
+ ssource->revents |= G_IO_IN;
+ }
+ if (FD_ISSET(ssource->socket, &wfds)) {
+ ssource->revents |= G_IO_OUT;
+ }
+ if (FD_ISSET(ssource->socket, &xfds)) {
+ ssource->revents |= G_IO_PRI;
+ }
+
+ return ssource->revents;
+}
+
+
+static gboolean
+qio_channel_socket_source_dispatch(GSource *source,
+ GSourceFunc callback,
+ gpointer user_data)
+{
+ QIOChannelFunc func = (QIOChannelFunc)callback;
+ QIOChannelSocketSource *ssource = (QIOChannelSocketSource *)source;
+
+ return (*func)(ssource->ioc, ssource->revents, user_data);
+}
+
+
+static void
+qio_channel_socket_source_finalize(GSource *source)
+{
+ QIOChannelSocketSource *ssource = (QIOChannelSocketSource *)source;
+
+ object_unref(OBJECT(ssource->ioc));
+}
+
+
+GSourceFuncs qio_channel_socket_source_funcs = {
+ qio_channel_socket_source_prepare,
+ qio_channel_socket_source_check,
+ qio_channel_socket_source_dispatch,
+ qio_channel_socket_source_finalize
+};
+#endif
+
+
+static gboolean
+qio_channel_fd_pair_source_prepare(GSource *source G_GNUC_UNUSED,
+ gint *timeout)
+{
+ *timeout = -1;
+
+ return FALSE;
+}
+
+
+static gboolean
+qio_channel_fd_pair_source_check(GSource *source)
+{
+ QIOChannelFDPairSource *ssource = (QIOChannelFDPairSource *)source;
+ GIOCondition poll_condition = ssource->fdread.revents |
+ ssource->fdwrite.revents;
+
+ return poll_condition & ssource->condition;
+}
+
+
+static gboolean
+qio_channel_fd_pair_source_dispatch(GSource *source,
+ GSourceFunc callback,
+ gpointer user_data)
+{
+ QIOChannelFunc func = (QIOChannelFunc)callback;
+ QIOChannelFDPairSource *ssource = (QIOChannelFDPairSource *)source;
+ GIOCondition poll_condition = ssource->fdread.revents |
+ ssource->fdwrite.revents;
+
+ return (*func)(ssource->ioc,
+ poll_condition & ssource->condition,
+ user_data);
+}
+
+
+static void
+qio_channel_fd_pair_source_finalize(GSource *source)
+{
+ QIOChannelFDPairSource *ssource = (QIOChannelFDPairSource *)source;
+
+ object_unref(OBJECT(ssource->ioc));
+}
+
+
+GSourceFuncs qio_channel_fd_source_funcs = {
+ qio_channel_fd_source_prepare,
+ qio_channel_fd_source_check,
+ qio_channel_fd_source_dispatch,
+ qio_channel_fd_source_finalize
+};
+
+
+GSourceFuncs qio_channel_fd_pair_source_funcs = {
+ qio_channel_fd_pair_source_prepare,
+ qio_channel_fd_pair_source_check,
+ qio_channel_fd_pair_source_dispatch,
+ qio_channel_fd_pair_source_finalize
+};
+
+
+GSource *qio_channel_create_fd_watch(QIOChannel *ioc,
+ int fd,
+ GIOCondition condition)
+{
+ GSource *source;
+ QIOChannelFDSource *ssource;
+
+ source = g_source_new(&qio_channel_fd_source_funcs,
+ sizeof(QIOChannelFDSource));
+ ssource = (QIOChannelFDSource *)source;
+
+ ssource->ioc = ioc;
+ object_ref(OBJECT(ioc));
+
+ ssource->condition = condition;
+
+#ifdef CONFIG_WIN32
+ ssource->fd.fd = (gint64)_get_osfhandle(fd);
+#else
+ ssource->fd.fd = fd;
+#endif
+ ssource->fd.events = condition;
+
+ g_source_add_poll(source, &ssource->fd);
+
+ return source;
+}
+
+#ifdef CONFIG_WIN32
+GSource *qio_channel_create_socket_watch(QIOChannel *ioc,
+ int socket,
+ GIOCondition condition)
+{
+ GSource *source;
+ QIOChannelSocketSource *ssource;
+
+#ifdef WIN32
+ WSAEventSelect(socket, ioc->event,
+ FD_READ | FD_ACCEPT | FD_CLOSE |
+ FD_CONNECT | FD_WRITE | FD_OOB);
+#endif
+
+ source = g_source_new(&qio_channel_socket_source_funcs,
+ sizeof(QIOChannelSocketSource));
+ ssource = (QIOChannelSocketSource *)source;
+
+ ssource->ioc = ioc;
+ object_ref(OBJECT(ioc));
+
+ ssource->condition = condition;
+ ssource->socket = socket;
+ ssource->revents = 0;
+
+ ssource->fd.fd = (gintptr)ioc->event;
+ ssource->fd.events = G_IO_IN;
+
+ g_source_add_poll(source, &ssource->fd);
+
+ return source;
+}
+#else
+GSource *qio_channel_create_socket_watch(QIOChannel *ioc,
+ int socket,
+ GIOCondition condition)
+{
+ return qio_channel_create_fd_watch(ioc, socket, condition);
+}
+#endif
+
+GSource *qio_channel_create_fd_pair_watch(QIOChannel *ioc,
+ int fdread,
+ int fdwrite,
+ GIOCondition condition)
+{
+ GSource *source;
+ QIOChannelFDPairSource *ssource;
+
+ source = g_source_new(&qio_channel_fd_pair_source_funcs,
+ sizeof(QIOChannelFDPairSource));
+ ssource = (QIOChannelFDPairSource *)source;
+
+ ssource->ioc = ioc;
+ object_ref(OBJECT(ioc));
+
+ ssource->condition = condition;
+
+#ifdef CONFIG_WIN32
+ ssource->fdread.fd = (gint64)_get_osfhandle(fdread);
+ ssource->fdwrite.fd = (gint64)_get_osfhandle(fdwrite);
+#else
+ ssource->fdread.fd = fdread;
+ ssource->fdwrite.fd = fdwrite;
+#endif
+
+ ssource->fdread.events = condition & G_IO_IN;
+ ssource->fdwrite.events = condition & G_IO_OUT;
+
+ g_source_add_poll(source, &ssource->fdread);
+ g_source_add_poll(source, &ssource->fdwrite);
+
+ return source;
+}
diff --git a/io/channel-websock.c b/io/channel-websock.c
new file mode 100644
index 000000000..70889bb54
--- /dev/null
+++ b/io/channel-websock.c
@@ -0,0 +1,1335 @@
+/*
+ * QEMU I/O channels driver websockets
+ *
+ * Copyright (c) 2015 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include "qemu/osdep.h"
+#include "qapi/error.h"
+#include "qemu/bswap.h"
+#include "io/channel-websock.h"
+#include "crypto/hash.h"
+#include "trace.h"
+#include "qemu/iov.h"
+#include "qemu/module.h"
+
+/* Max amount to allow in rawinput/encoutput buffers */
+#define QIO_CHANNEL_WEBSOCK_MAX_BUFFER 8192
+
+#define QIO_CHANNEL_WEBSOCK_CLIENT_KEY_LEN 24
+#define QIO_CHANNEL_WEBSOCK_GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
+#define QIO_CHANNEL_WEBSOCK_GUID_LEN strlen(QIO_CHANNEL_WEBSOCK_GUID)
+
+#define QIO_CHANNEL_WEBSOCK_HEADER_PROTOCOL "sec-websocket-protocol"
+#define QIO_CHANNEL_WEBSOCK_HEADER_VERSION "sec-websocket-version"
+#define QIO_CHANNEL_WEBSOCK_HEADER_KEY "sec-websocket-key"
+#define QIO_CHANNEL_WEBSOCK_HEADER_UPGRADE "upgrade"
+#define QIO_CHANNEL_WEBSOCK_HEADER_HOST "host"
+#define QIO_CHANNEL_WEBSOCK_HEADER_CONNECTION "connection"
+
+#define QIO_CHANNEL_WEBSOCK_PROTOCOL_BINARY "binary"
+#define QIO_CHANNEL_WEBSOCK_CONNECTION_UPGRADE "Upgrade"
+#define QIO_CHANNEL_WEBSOCK_UPGRADE_WEBSOCKET "websocket"
+
+#define QIO_CHANNEL_WEBSOCK_HANDSHAKE_RES_COMMON \
+ "Server: QEMU VNC\r\n" \
+ "Date: %s\r\n"
+
+#define QIO_CHANNEL_WEBSOCK_HANDSHAKE_WITH_PROTO_RES_OK \
+ "HTTP/1.1 101 Switching Protocols\r\n" \
+ QIO_CHANNEL_WEBSOCK_HANDSHAKE_RES_COMMON \
+ "Upgrade: websocket\r\n" \
+ "Connection: Upgrade\r\n" \
+ "Sec-WebSocket-Accept: %s\r\n" \
+ "Sec-WebSocket-Protocol: binary\r\n" \
+ "\r\n"
+#define QIO_CHANNEL_WEBSOCK_HANDSHAKE_RES_OK \
+ "HTTP/1.1 101 Switching Protocols\r\n" \
+ QIO_CHANNEL_WEBSOCK_HANDSHAKE_RES_COMMON \
+ "Upgrade: websocket\r\n" \
+ "Connection: Upgrade\r\n" \
+ "Sec-WebSocket-Accept: %s\r\n" \
+ "\r\n"
+#define QIO_CHANNEL_WEBSOCK_HANDSHAKE_RES_NOT_FOUND \
+ "HTTP/1.1 404 Not Found\r\n" \
+ QIO_CHANNEL_WEBSOCK_HANDSHAKE_RES_COMMON \
+ "Connection: close\r\n" \
+ "\r\n"
+#define QIO_CHANNEL_WEBSOCK_HANDSHAKE_RES_BAD_REQUEST \
+ "HTTP/1.1 400 Bad Request\r\n" \
+ QIO_CHANNEL_WEBSOCK_HANDSHAKE_RES_COMMON \
+ "Connection: close\r\n" \
+ "Sec-WebSocket-Version: " \
+ QIO_CHANNEL_WEBSOCK_SUPPORTED_VERSION \
+ "\r\n"
+#define QIO_CHANNEL_WEBSOCK_HANDSHAKE_RES_SERVER_ERR \
+ "HTTP/1.1 500 Internal Server Error\r\n" \
+ QIO_CHANNEL_WEBSOCK_HANDSHAKE_RES_COMMON \
+ "Connection: close\r\n" \
+ "\r\n"
+#define QIO_CHANNEL_WEBSOCK_HANDSHAKE_RES_TOO_LARGE \
+ "HTTP/1.1 403 Request Entity Too Large\r\n" \
+ QIO_CHANNEL_WEBSOCK_HANDSHAKE_RES_COMMON \
+ "Connection: close\r\n" \
+ "\r\n"
+#define QIO_CHANNEL_WEBSOCK_HANDSHAKE_DELIM "\r\n"
+#define QIO_CHANNEL_WEBSOCK_HANDSHAKE_END "\r\n\r\n"
+#define QIO_CHANNEL_WEBSOCK_SUPPORTED_VERSION "13"
+#define QIO_CHANNEL_WEBSOCK_HTTP_METHOD "GET"
+#define QIO_CHANNEL_WEBSOCK_HTTP_PATH "/"
+#define QIO_CHANNEL_WEBSOCK_HTTP_VERSION "HTTP/1.1"
+
+/* The websockets packet header is variable length
+ * depending on the size of the payload... */
+
+/* ...length when using 7-bit payload length */
+#define QIO_CHANNEL_WEBSOCK_HEADER_LEN_7_BIT 6
+/* ...length when using 16-bit payload length */
+#define QIO_CHANNEL_WEBSOCK_HEADER_LEN_16_BIT 8
+/* ...length when using 64-bit payload length */
+#define QIO_CHANNEL_WEBSOCK_HEADER_LEN_64_BIT 14
+
+/* Length of the optional data mask field in header */
+#define QIO_CHANNEL_WEBSOCK_HEADER_LEN_MASK 4
+
+/* Maximum length that can fit in 7-bit payload size */
+#define QIO_CHANNEL_WEBSOCK_PAYLOAD_LEN_THRESHOLD_7_BIT 126
+/* Maximum length that can fit in 16-bit payload size */
+#define QIO_CHANNEL_WEBSOCK_PAYLOAD_LEN_THRESHOLD_16_BIT 65536
+
+/* Magic 7-bit length to indicate use of 16-bit payload length */
+#define QIO_CHANNEL_WEBSOCK_PAYLOAD_LEN_MAGIC_16_BIT 126
+/* Magic 7-bit length to indicate use of 64-bit payload length */
+#define QIO_CHANNEL_WEBSOCK_PAYLOAD_LEN_MAGIC_64_BIT 127
+
+/* Bitmasks for accessing header fields */
+#define QIO_CHANNEL_WEBSOCK_HEADER_FIELD_FIN 0x80
+#define QIO_CHANNEL_WEBSOCK_HEADER_FIELD_OPCODE 0x0f
+#define QIO_CHANNEL_WEBSOCK_HEADER_FIELD_HAS_MASK 0x80
+#define QIO_CHANNEL_WEBSOCK_HEADER_FIELD_PAYLOAD_LEN 0x7f
+#define QIO_CHANNEL_WEBSOCK_CONTROL_OPCODE_MASK 0x8
+
+typedef struct QIOChannelWebsockHeader QIOChannelWebsockHeader;
+
+struct QEMU_PACKED QIOChannelWebsockHeader {
+ unsigned char b0;
+ unsigned char b1;
+ union {
+ struct QEMU_PACKED {
+ uint16_t l16;
+ QIOChannelWebsockMask m16;
+ } s16;
+ struct QEMU_PACKED {
+ uint64_t l64;
+ QIOChannelWebsockMask m64;
+ } s64;
+ QIOChannelWebsockMask m;
+ } u;
+};
+
+typedef struct QIOChannelWebsockHTTPHeader QIOChannelWebsockHTTPHeader;
+
+struct QIOChannelWebsockHTTPHeader {
+ char *name;
+ char *value;
+};
+
+enum {
+ QIO_CHANNEL_WEBSOCK_OPCODE_CONTINUATION = 0x0,
+ QIO_CHANNEL_WEBSOCK_OPCODE_TEXT_FRAME = 0x1,
+ QIO_CHANNEL_WEBSOCK_OPCODE_BINARY_FRAME = 0x2,
+ QIO_CHANNEL_WEBSOCK_OPCODE_CLOSE = 0x8,
+ QIO_CHANNEL_WEBSOCK_OPCODE_PING = 0x9,
+ QIO_CHANNEL_WEBSOCK_OPCODE_PONG = 0xA
+};
+
+static void GCC_FMT_ATTR(2, 3)
+qio_channel_websock_handshake_send_res(QIOChannelWebsock *ioc,
+ const char *resmsg,
+ ...)
+{
+ va_list vargs;
+ char *response;
+ size_t responselen;
+
+ va_start(vargs, resmsg);
+ response = g_strdup_vprintf(resmsg, vargs);
+ responselen = strlen(response);
+ buffer_reserve(&ioc->encoutput, responselen);
+ buffer_append(&ioc->encoutput, response, responselen);
+ g_free(response);
+ va_end(vargs);
+}
+
+static gchar *qio_channel_websock_date_str(void)
+{
+ g_autoptr(GDateTime) now = g_date_time_new_now_utc();
+
+ return g_date_time_format(now, "%a, %d %b %Y %H:%M:%S GMT");
+}
+
+static void qio_channel_websock_handshake_send_res_err(QIOChannelWebsock *ioc,
+ const char *resdata)
+{
+ char *date = qio_channel_websock_date_str();
+ qio_channel_websock_handshake_send_res(ioc, resdata, date);
+ g_free(date);
+}
+
+enum {
+ QIO_CHANNEL_WEBSOCK_STATUS_NORMAL = 1000,
+ QIO_CHANNEL_WEBSOCK_STATUS_PROTOCOL_ERR = 1002,
+ QIO_CHANNEL_WEBSOCK_STATUS_INVALID_DATA = 1003,
+ QIO_CHANNEL_WEBSOCK_STATUS_POLICY = 1008,
+ QIO_CHANNEL_WEBSOCK_STATUS_TOO_LARGE = 1009,
+ QIO_CHANNEL_WEBSOCK_STATUS_SERVER_ERR = 1011,
+};
+
+static size_t
+qio_channel_websock_extract_headers(QIOChannelWebsock *ioc,
+ char *buffer,
+ QIOChannelWebsockHTTPHeader *hdrs,
+ size_t nhdrsalloc,
+ Error **errp)
+{
+ char *nl, *sep, *tmp;
+ size_t nhdrs = 0;
+
+ /*
+ * First parse the HTTP protocol greeting of format:
+ *
+ * $METHOD $PATH $VERSION
+ *
+ * e.g.
+ *
+ * GET / HTTP/1.1
+ */
+
+ nl = strstr(buffer, QIO_CHANNEL_WEBSOCK_HANDSHAKE_DELIM);
+ if (!nl) {
+ error_setg(errp, "Missing HTTP header delimiter");
+ goto bad_request;
+ }
+ *nl = '\0';
+ trace_qio_channel_websock_http_greeting(ioc, buffer);
+
+ tmp = strchr(buffer, ' ');
+ if (!tmp) {
+ error_setg(errp, "Missing HTTP path delimiter");
+ return 0;
+ }
+ *tmp = '\0';
+
+ if (!g_str_equal(buffer, QIO_CHANNEL_WEBSOCK_HTTP_METHOD)) {
+ error_setg(errp, "Unsupported HTTP method %s", buffer);
+ goto bad_request;
+ }
+
+ buffer = tmp + 1;
+ tmp = strchr(buffer, ' ');
+ if (!tmp) {
+ error_setg(errp, "Missing HTTP version delimiter");
+ goto bad_request;
+ }
+ *tmp = '\0';
+
+ if (!g_str_equal(buffer, QIO_CHANNEL_WEBSOCK_HTTP_PATH)) {
+ qio_channel_websock_handshake_send_res_err(
+ ioc, QIO_CHANNEL_WEBSOCK_HANDSHAKE_RES_NOT_FOUND);
+ error_setg(errp, "Unexpected HTTP path %s", buffer);
+ return 0;
+ }
+
+ buffer = tmp + 1;
+
+ if (!g_str_equal(buffer, QIO_CHANNEL_WEBSOCK_HTTP_VERSION)) {
+ error_setg(errp, "Unsupported HTTP version %s", buffer);
+ goto bad_request;
+ }
+
+ buffer = nl + strlen(QIO_CHANNEL_WEBSOCK_HANDSHAKE_DELIM);
+
+ /*
+ * Now parse all the header fields of format
+ *
+ * $NAME: $VALUE
+ *
+ * e.g.
+ *
+ * Cache-control: no-cache
+ */
+ do {
+ QIOChannelWebsockHTTPHeader *hdr;
+
+ nl = strstr(buffer, QIO_CHANNEL_WEBSOCK_HANDSHAKE_DELIM);
+ if (nl) {
+ *nl = '\0';
+ }
+
+ sep = strchr(buffer, ':');
+ if (!sep) {
+ error_setg(errp, "Malformed HTTP header");
+ goto bad_request;
+ }
+ *sep = '\0';
+ sep++;
+ while (*sep == ' ') {
+ sep++;
+ }
+
+ if (nhdrs >= nhdrsalloc) {
+ error_setg(errp, "Too many HTTP headers");
+ goto bad_request;
+ }
+
+ hdr = &hdrs[nhdrs++];
+ hdr->name = buffer;
+ hdr->value = sep;
+
+ /* Canonicalize header name for easier identification later */
+ for (tmp = hdr->name; *tmp; tmp++) {
+ *tmp = g_ascii_tolower(*tmp);
+ }
+
+ if (nl) {
+ buffer = nl + strlen(QIO_CHANNEL_WEBSOCK_HANDSHAKE_DELIM);
+ }
+ } while (nl != NULL);
+
+ return nhdrs;
+
+ bad_request:
+ qio_channel_websock_handshake_send_res_err(
+ ioc, QIO_CHANNEL_WEBSOCK_HANDSHAKE_RES_BAD_REQUEST);
+ return 0;
+}
+
+static const char *
+qio_channel_websock_find_header(QIOChannelWebsockHTTPHeader *hdrs,
+ size_t nhdrs,
+ const char *name)
+{
+ size_t i;
+
+ for (i = 0; i < nhdrs; i++) {
+ if (g_str_equal(hdrs[i].name, name)) {
+ return hdrs[i].value;
+ }
+ }
+
+ return NULL;
+}
+
+
+static void qio_channel_websock_handshake_send_res_ok(QIOChannelWebsock *ioc,
+ const char *key,
+ const bool use_protocols,
+ Error **errp)
+{
+ char combined_key[QIO_CHANNEL_WEBSOCK_CLIENT_KEY_LEN +
+ QIO_CHANNEL_WEBSOCK_GUID_LEN + 1];
+ char *accept = NULL;
+ char *date = NULL;
+
+ g_strlcpy(combined_key, key, QIO_CHANNEL_WEBSOCK_CLIENT_KEY_LEN + 1);
+ g_strlcat(combined_key, QIO_CHANNEL_WEBSOCK_GUID,
+ QIO_CHANNEL_WEBSOCK_CLIENT_KEY_LEN +
+ QIO_CHANNEL_WEBSOCK_GUID_LEN + 1);
+
+ /* hash and encode it */
+ if (qcrypto_hash_base64(QCRYPTO_HASH_ALG_SHA1,
+ combined_key,
+ QIO_CHANNEL_WEBSOCK_CLIENT_KEY_LEN +
+ QIO_CHANNEL_WEBSOCK_GUID_LEN,
+ &accept,
+ errp) < 0) {
+ qio_channel_websock_handshake_send_res_err(
+ ioc, QIO_CHANNEL_WEBSOCK_HANDSHAKE_RES_SERVER_ERR);
+ return;
+ }
+
+ date = qio_channel_websock_date_str();
+ if (use_protocols) {
+ qio_channel_websock_handshake_send_res(
+ ioc, QIO_CHANNEL_WEBSOCK_HANDSHAKE_WITH_PROTO_RES_OK,
+ date, accept);
+ } else {
+ qio_channel_websock_handshake_send_res(
+ ioc, QIO_CHANNEL_WEBSOCK_HANDSHAKE_RES_OK, date, accept);
+ }
+
+ g_free(date);
+ g_free(accept);
+}
+
+static void qio_channel_websock_handshake_process(QIOChannelWebsock *ioc,
+ char *buffer,
+ Error **errp)
+{
+ QIOChannelWebsockHTTPHeader hdrs[32];
+ size_t nhdrs = G_N_ELEMENTS(hdrs);
+ const char *protocols = NULL, *version = NULL, *key = NULL,
+ *host = NULL, *connection = NULL, *upgrade = NULL;
+ char **connectionv;
+ bool upgraded = false;
+ size_t i;
+
+ nhdrs = qio_channel_websock_extract_headers(ioc, buffer, hdrs, nhdrs, errp);
+ if (!nhdrs) {
+ return;
+ }
+
+ protocols = qio_channel_websock_find_header(
+ hdrs, nhdrs, QIO_CHANNEL_WEBSOCK_HEADER_PROTOCOL);
+
+ version = qio_channel_websock_find_header(
+ hdrs, nhdrs, QIO_CHANNEL_WEBSOCK_HEADER_VERSION);
+ if (!version) {
+ error_setg(errp, "Missing websocket version header data");
+ goto bad_request;
+ }
+
+ key = qio_channel_websock_find_header(
+ hdrs, nhdrs, QIO_CHANNEL_WEBSOCK_HEADER_KEY);
+ if (!key) {
+ error_setg(errp, "Missing websocket key header data");
+ goto bad_request;
+ }
+
+ host = qio_channel_websock_find_header(
+ hdrs, nhdrs, QIO_CHANNEL_WEBSOCK_HEADER_HOST);
+ if (!host) {
+ error_setg(errp, "Missing websocket host header data");
+ goto bad_request;
+ }
+
+ connection = qio_channel_websock_find_header(
+ hdrs, nhdrs, QIO_CHANNEL_WEBSOCK_HEADER_CONNECTION);
+ if (!connection) {
+ error_setg(errp, "Missing websocket connection header data");
+ goto bad_request;
+ }
+
+ upgrade = qio_channel_websock_find_header(
+ hdrs, nhdrs, QIO_CHANNEL_WEBSOCK_HEADER_UPGRADE);
+ if (!upgrade) {
+ error_setg(errp, "Missing websocket upgrade header data");
+ goto bad_request;
+ }
+
+ trace_qio_channel_websock_http_request(ioc, protocols, version,
+ host, connection, upgrade, key);
+
+ if (protocols) {
+ if (!g_strrstr(protocols, QIO_CHANNEL_WEBSOCK_PROTOCOL_BINARY)) {
+ error_setg(errp, "No '%s' protocol is supported by client '%s'",
+ QIO_CHANNEL_WEBSOCK_PROTOCOL_BINARY, protocols);
+ goto bad_request;
+ }
+ }
+
+ if (!g_str_equal(version, QIO_CHANNEL_WEBSOCK_SUPPORTED_VERSION)) {
+ error_setg(errp, "Version '%s' is not supported by client '%s'",
+ QIO_CHANNEL_WEBSOCK_SUPPORTED_VERSION, version);
+ goto bad_request;
+ }
+
+ if (strlen(key) != QIO_CHANNEL_WEBSOCK_CLIENT_KEY_LEN) {
+ error_setg(errp, "Key length '%zu' was not as expected '%d'",
+ strlen(key), QIO_CHANNEL_WEBSOCK_CLIENT_KEY_LEN);
+ goto bad_request;
+ }
+
+ connectionv = g_strsplit(connection, ",", 0);
+ for (i = 0; connectionv != NULL && connectionv[i] != NULL; i++) {
+ g_strstrip(connectionv[i]);
+ if (strcasecmp(connectionv[i],
+ QIO_CHANNEL_WEBSOCK_CONNECTION_UPGRADE) == 0) {
+ upgraded = true;
+ }
+ }
+ g_strfreev(connectionv);
+ if (!upgraded) {
+ error_setg(errp, "No connection upgrade requested '%s'", connection);
+ goto bad_request;
+ }
+
+ if (strcasecmp(upgrade, QIO_CHANNEL_WEBSOCK_UPGRADE_WEBSOCKET) != 0) {
+ error_setg(errp, "Incorrect upgrade method '%s'", upgrade);
+ goto bad_request;
+ }
+
+ qio_channel_websock_handshake_send_res_ok(ioc, key, !!protocols, errp);
+ return;
+
+ bad_request:
+ qio_channel_websock_handshake_send_res_err(
+ ioc, QIO_CHANNEL_WEBSOCK_HANDSHAKE_RES_BAD_REQUEST);
+}
+
+static int qio_channel_websock_handshake_read(QIOChannelWebsock *ioc,
+ Error **errp)
+{
+ char *handshake_end;
+ ssize_t ret;
+ /* Typical HTTP headers from novnc are 512 bytes, so limiting
+ * total header size to 4096 is easily enough. */
+ size_t want = 4096 - ioc->encinput.offset;
+ buffer_reserve(&ioc->encinput, want);
+ ret = qio_channel_read(ioc->master,
+ (char *)buffer_end(&ioc->encinput), want, errp);
+ if (ret < 0) {
+ return -1;
+ }
+ ioc->encinput.offset += ret;
+
+ handshake_end = g_strstr_len((char *)ioc->encinput.buffer,
+ ioc->encinput.offset,
+ QIO_CHANNEL_WEBSOCK_HANDSHAKE_END);
+ if (!handshake_end) {
+ if (ioc->encinput.offset >= 4096) {
+ qio_channel_websock_handshake_send_res_err(
+ ioc, QIO_CHANNEL_WEBSOCK_HANDSHAKE_RES_TOO_LARGE);
+ error_setg(errp,
+ "End of headers not found in first 4096 bytes");
+ return 1;
+ } else if (ret == 0) {
+ error_setg(errp,
+ "End of headers not found before connection closed");
+ return -1;
+ }
+ return 0;
+ }
+ *handshake_end = '\0';
+
+ qio_channel_websock_handshake_process(ioc,
+ (char *)ioc->encinput.buffer,
+ errp);
+
+ buffer_advance(&ioc->encinput,
+ handshake_end - (char *)ioc->encinput.buffer +
+ strlen(QIO_CHANNEL_WEBSOCK_HANDSHAKE_END));
+ return 1;
+}
+
+static gboolean qio_channel_websock_handshake_send(QIOChannel *ioc,
+ GIOCondition condition,
+ gpointer user_data)
+{
+ QIOTask *task = user_data;
+ QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(
+ qio_task_get_source(task));
+ Error *err = NULL;
+ ssize_t ret;
+
+ ret = qio_channel_write(wioc->master,
+ (char *)wioc->encoutput.buffer,
+ wioc->encoutput.offset,
+ &err);
+
+ if (ret < 0) {
+ trace_qio_channel_websock_handshake_fail(ioc, error_get_pretty(err));
+ qio_task_set_error(task, err);
+ qio_task_complete(task);
+ return FALSE;
+ }
+
+ buffer_advance(&wioc->encoutput, ret);
+ if (wioc->encoutput.offset == 0) {
+ if (wioc->io_err) {
+ trace_qio_channel_websock_handshake_fail(
+ ioc, error_get_pretty(wioc->io_err));
+ qio_task_set_error(task, wioc->io_err);
+ wioc->io_err = NULL;
+ qio_task_complete(task);
+ } else {
+ trace_qio_channel_websock_handshake_complete(ioc);
+ qio_task_complete(task);
+ }
+ return FALSE;
+ }
+ trace_qio_channel_websock_handshake_pending(ioc, G_IO_OUT);
+ return TRUE;
+}
+
+static gboolean qio_channel_websock_handshake_io(QIOChannel *ioc,
+ GIOCondition condition,
+ gpointer user_data)
+{
+ QIOTask *task = user_data;
+ QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(
+ qio_task_get_source(task));
+ Error *err = NULL;
+ int ret;
+
+ ret = qio_channel_websock_handshake_read(wioc, &err);
+ if (ret < 0) {
+ /*
+ * We only take this path on a fatal I/O error reading from
+ * client connection, as most of the time we have an
+ * HTTP 4xx err response to send instead
+ */
+ trace_qio_channel_websock_handshake_fail(ioc, error_get_pretty(err));
+ qio_task_set_error(task, err);
+ qio_task_complete(task);
+ return FALSE;
+ }
+ if (ret == 0) {
+ trace_qio_channel_websock_handshake_pending(ioc, G_IO_IN);
+ /* need more data still */
+ return TRUE;
+ }
+
+ error_propagate(&wioc->io_err, err);
+
+ trace_qio_channel_websock_handshake_reply(ioc);
+ qio_channel_add_watch(
+ wioc->master,
+ G_IO_OUT,
+ qio_channel_websock_handshake_send,
+ task,
+ NULL);
+ return FALSE;
+}
+
+
+static void qio_channel_websock_encode(QIOChannelWebsock *ioc,
+ uint8_t opcode,
+ const struct iovec *iov,
+ size_t niov,
+ size_t size)
+{
+ size_t header_size;
+ size_t i;
+ union {
+ char buf[QIO_CHANNEL_WEBSOCK_HEADER_LEN_64_BIT];
+ QIOChannelWebsockHeader ws;
+ } header;
+
+ assert(size <= iov_size(iov, niov));
+
+ header.ws.b0 = QIO_CHANNEL_WEBSOCK_HEADER_FIELD_FIN |
+ (opcode & QIO_CHANNEL_WEBSOCK_HEADER_FIELD_OPCODE);
+ if (size < QIO_CHANNEL_WEBSOCK_PAYLOAD_LEN_THRESHOLD_7_BIT) {
+ header.ws.b1 = (uint8_t)size;
+ header_size = QIO_CHANNEL_WEBSOCK_HEADER_LEN_7_BIT;
+ } else if (size < QIO_CHANNEL_WEBSOCK_PAYLOAD_LEN_THRESHOLD_16_BIT) {
+ header.ws.b1 = QIO_CHANNEL_WEBSOCK_PAYLOAD_LEN_MAGIC_16_BIT;
+ header.ws.u.s16.l16 = cpu_to_be16((uint16_t)size);
+ header_size = QIO_CHANNEL_WEBSOCK_HEADER_LEN_16_BIT;
+ } else {
+ header.ws.b1 = QIO_CHANNEL_WEBSOCK_PAYLOAD_LEN_MAGIC_64_BIT;
+ header.ws.u.s64.l64 = cpu_to_be64(size);
+ header_size = QIO_CHANNEL_WEBSOCK_HEADER_LEN_64_BIT;
+ }
+ header_size -= QIO_CHANNEL_WEBSOCK_HEADER_LEN_MASK;
+
+ trace_qio_channel_websock_encode(ioc, opcode, header_size, size);
+ buffer_reserve(&ioc->encoutput, header_size + size);
+ buffer_append(&ioc->encoutput, header.buf, header_size);
+ for (i = 0; i < niov && size != 0; i++) {
+ size_t want = iov[i].iov_len;
+ if (want > size) {
+ want = size;
+ }
+ buffer_append(&ioc->encoutput, iov[i].iov_base, want);
+ size -= want;
+ }
+}
+
+
+static ssize_t qio_channel_websock_write_wire(QIOChannelWebsock *, Error **);
+
+
+static void qio_channel_websock_write_close(QIOChannelWebsock *ioc,
+ uint16_t code, const char *reason)
+{
+ struct iovec iov[2] = {
+ { .iov_base = &code, .iov_len = sizeof(code) },
+ };
+ size_t niov = 1;
+ size_t size = iov[0].iov_len;
+
+ cpu_to_be16s(&code);
+
+ if (reason) {
+ iov[1].iov_base = (void *)reason;
+ iov[1].iov_len = strlen(reason);
+ size += iov[1].iov_len;
+ niov++;
+ }
+ qio_channel_websock_encode(ioc, QIO_CHANNEL_WEBSOCK_OPCODE_CLOSE,
+ iov, niov, size);
+ qio_channel_websock_write_wire(ioc, NULL);
+ qio_channel_shutdown(ioc->master, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+}
+
+
+static int qio_channel_websock_decode_header(QIOChannelWebsock *ioc,
+ Error **errp)
+{
+ unsigned char opcode, fin, has_mask;
+ size_t header_size;
+ size_t payload_len;
+ QIOChannelWebsockHeader *header =
+ (QIOChannelWebsockHeader *)ioc->encinput.buffer;
+
+ if (ioc->payload_remain) {
+ error_setg(errp,
+ "Decoding header but %zu bytes of payload remain",
+ ioc->payload_remain);
+ qio_channel_websock_write_close(
+ ioc, QIO_CHANNEL_WEBSOCK_STATUS_SERVER_ERR,
+ "internal server error");
+ return -1;
+ }
+ if (ioc->encinput.offset < QIO_CHANNEL_WEBSOCK_HEADER_LEN_7_BIT) {
+ /* header not complete */
+ return QIO_CHANNEL_ERR_BLOCK;
+ }
+
+ fin = header->b0 & QIO_CHANNEL_WEBSOCK_HEADER_FIELD_FIN;
+ opcode = header->b0 & QIO_CHANNEL_WEBSOCK_HEADER_FIELD_OPCODE;
+ has_mask = header->b1 & QIO_CHANNEL_WEBSOCK_HEADER_FIELD_HAS_MASK;
+ payload_len = header->b1 & QIO_CHANNEL_WEBSOCK_HEADER_FIELD_PAYLOAD_LEN;
+
+ /* Save or restore opcode. */
+ if (opcode) {
+ ioc->opcode = opcode;
+ } else {
+ opcode = ioc->opcode;
+ }
+
+ trace_qio_channel_websock_header_partial_decode(ioc, payload_len,
+ fin, opcode, (int)has_mask);
+
+ if (opcode == QIO_CHANNEL_WEBSOCK_OPCODE_CLOSE) {
+ /* disconnect */
+ return 0;
+ }
+
+ /* Websocket frame sanity check:
+ * * Fragmentation is only supported for binary frames.
+ * * All frames sent by a client MUST be masked.
+ * * Only binary and ping/pong encoding is supported.
+ */
+ if (!fin) {
+ if (opcode != QIO_CHANNEL_WEBSOCK_OPCODE_BINARY_FRAME) {
+ error_setg(errp, "only binary websocket frames may be fragmented");
+ qio_channel_websock_write_close(
+ ioc, QIO_CHANNEL_WEBSOCK_STATUS_POLICY ,
+ "only binary frames may be fragmented");
+ return -1;
+ }
+ } else {
+ if (opcode != QIO_CHANNEL_WEBSOCK_OPCODE_BINARY_FRAME &&
+ opcode != QIO_CHANNEL_WEBSOCK_OPCODE_CLOSE &&
+ opcode != QIO_CHANNEL_WEBSOCK_OPCODE_PING &&
+ opcode != QIO_CHANNEL_WEBSOCK_OPCODE_PONG) {
+ error_setg(errp, "unsupported opcode: 0x%04x; only binary, close, "
+ "ping, and pong websocket frames are supported", opcode);
+ qio_channel_websock_write_close(
+ ioc, QIO_CHANNEL_WEBSOCK_STATUS_INVALID_DATA ,
+ "only binary, close, ping, and pong frames are supported");
+ return -1;
+ }
+ }
+ if (!has_mask) {
+ error_setg(errp, "client websocket frames must be masked");
+ qio_channel_websock_write_close(
+ ioc, QIO_CHANNEL_WEBSOCK_STATUS_PROTOCOL_ERR,
+ "client frames must be masked");
+ return -1;
+ }
+
+ if (payload_len < QIO_CHANNEL_WEBSOCK_PAYLOAD_LEN_MAGIC_16_BIT) {
+ ioc->payload_remain = payload_len;
+ header_size = QIO_CHANNEL_WEBSOCK_HEADER_LEN_7_BIT;
+ ioc->mask = header->u.m;
+ } else if (opcode & QIO_CHANNEL_WEBSOCK_CONTROL_OPCODE_MASK) {
+ error_setg(errp, "websocket control frame is too large");
+ qio_channel_websock_write_close(
+ ioc, QIO_CHANNEL_WEBSOCK_STATUS_PROTOCOL_ERR,
+ "control frame is too large");
+ return -1;
+ } else if (payload_len == QIO_CHANNEL_WEBSOCK_PAYLOAD_LEN_MAGIC_16_BIT &&
+ ioc->encinput.offset >= QIO_CHANNEL_WEBSOCK_HEADER_LEN_16_BIT) {
+ ioc->payload_remain = be16_to_cpu(header->u.s16.l16);
+ header_size = QIO_CHANNEL_WEBSOCK_HEADER_LEN_16_BIT;
+ ioc->mask = header->u.s16.m16;
+ } else if (payload_len == QIO_CHANNEL_WEBSOCK_PAYLOAD_LEN_MAGIC_64_BIT &&
+ ioc->encinput.offset >= QIO_CHANNEL_WEBSOCK_HEADER_LEN_64_BIT) {
+ ioc->payload_remain = be64_to_cpu(header->u.s64.l64);
+ header_size = QIO_CHANNEL_WEBSOCK_HEADER_LEN_64_BIT;
+ ioc->mask = header->u.s64.m64;
+ } else {
+ /* header not complete */
+ return QIO_CHANNEL_ERR_BLOCK;
+ }
+
+ trace_qio_channel_websock_header_full_decode(
+ ioc, header_size, ioc->payload_remain, ioc->mask.u);
+ buffer_advance(&ioc->encinput, header_size);
+ return 0;
+}
+
+
+static int qio_channel_websock_decode_payload(QIOChannelWebsock *ioc,
+ Error **errp)
+{
+ size_t i;
+ size_t payload_len = 0;
+ uint32_t *payload32;
+
+ if (ioc->payload_remain) {
+ /* If we aren't at the end of the payload, then drop
+ * off the last bytes, so we're always multiple of 4
+ * for purpose of unmasking, except at end of payload
+ */
+ if (ioc->encinput.offset < ioc->payload_remain) {
+ /* Wait for the entire payload before processing control frames
+ * because the payload will most likely be echoed back. */
+ if (ioc->opcode & QIO_CHANNEL_WEBSOCK_CONTROL_OPCODE_MASK) {
+ return QIO_CHANNEL_ERR_BLOCK;
+ }
+ payload_len = ioc->encinput.offset - (ioc->encinput.offset % 4);
+ } else {
+ payload_len = ioc->payload_remain;
+ }
+ if (payload_len == 0) {
+ return QIO_CHANNEL_ERR_BLOCK;
+ }
+
+ ioc->payload_remain -= payload_len;
+
+ /* unmask frame */
+ /* process 1 frame (32 bit op) */
+ payload32 = (uint32_t *)ioc->encinput.buffer;
+ for (i = 0; i < payload_len / 4; i++) {
+ payload32[i] ^= ioc->mask.u;
+ }
+ /* process the remaining bytes (if any) */
+ for (i *= 4; i < payload_len; i++) {
+ ioc->encinput.buffer[i] ^= ioc->mask.c[i % 4];
+ }
+ }
+
+ trace_qio_channel_websock_payload_decode(
+ ioc, ioc->opcode, ioc->payload_remain);
+
+ if (ioc->opcode == QIO_CHANNEL_WEBSOCK_OPCODE_BINARY_FRAME) {
+ if (payload_len) {
+ /* binary frames are passed on */
+ buffer_reserve(&ioc->rawinput, payload_len);
+ buffer_append(&ioc->rawinput, ioc->encinput.buffer, payload_len);
+ }
+ } else if (ioc->opcode == QIO_CHANNEL_WEBSOCK_OPCODE_CLOSE) {
+ /* close frames are echoed back */
+ error_setg(errp, "websocket closed by peer");
+ if (payload_len) {
+ /* echo client status */
+ struct iovec iov = { .iov_base = ioc->encinput.buffer,
+ .iov_len = ioc->encinput.offset };
+ qio_channel_websock_encode(ioc, QIO_CHANNEL_WEBSOCK_OPCODE_CLOSE,
+ &iov, 1, iov.iov_len);
+ qio_channel_websock_write_wire(ioc, NULL);
+ qio_channel_shutdown(ioc->master, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+ } else {
+ /* send our own status */
+ qio_channel_websock_write_close(
+ ioc, QIO_CHANNEL_WEBSOCK_STATUS_NORMAL, "peer requested close");
+ }
+ return -1;
+ } else if (ioc->opcode == QIO_CHANNEL_WEBSOCK_OPCODE_PING) {
+ /* ping frames produce an immediate reply, as long as we've not still
+ * got a previous pong queued, in which case we drop the new pong */
+ if (ioc->pong_remain == 0) {
+ struct iovec iov = { .iov_base = ioc->encinput.buffer,
+ .iov_len = ioc->encinput.offset };
+ qio_channel_websock_encode(ioc, QIO_CHANNEL_WEBSOCK_OPCODE_PONG,
+ &iov, 1, iov.iov_len);
+ ioc->pong_remain = ioc->encoutput.offset;
+ }
+ } /* pong frames are ignored */
+
+ if (payload_len) {
+ buffer_advance(&ioc->encinput, payload_len);
+ }
+ return 0;
+}
+
+
+QIOChannelWebsock *
+qio_channel_websock_new_server(QIOChannel *master)
+{
+ QIOChannelWebsock *wioc;
+ QIOChannel *ioc;
+
+ wioc = QIO_CHANNEL_WEBSOCK(object_new(TYPE_QIO_CHANNEL_WEBSOCK));
+ ioc = QIO_CHANNEL(wioc);
+
+ wioc->master = master;
+ if (qio_channel_has_feature(master, QIO_CHANNEL_FEATURE_SHUTDOWN)) {
+ qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
+ }
+ object_ref(OBJECT(master));
+
+ trace_qio_channel_websock_new_server(wioc, master);
+ return wioc;
+}
+
+void qio_channel_websock_handshake(QIOChannelWebsock *ioc,
+ QIOTaskFunc func,
+ gpointer opaque,
+ GDestroyNotify destroy)
+{
+ QIOTask *task;
+
+ task = qio_task_new(OBJECT(ioc),
+ func,
+ opaque,
+ destroy);
+
+ trace_qio_channel_websock_handshake_start(ioc);
+ trace_qio_channel_websock_handshake_pending(ioc, G_IO_IN);
+ qio_channel_add_watch(ioc->master,
+ G_IO_IN,
+ qio_channel_websock_handshake_io,
+ task,
+ NULL);
+}
+
+
+static void qio_channel_websock_finalize(Object *obj)
+{
+ QIOChannelWebsock *ioc = QIO_CHANNEL_WEBSOCK(obj);
+
+ buffer_free(&ioc->encinput);
+ buffer_free(&ioc->encoutput);
+ buffer_free(&ioc->rawinput);
+ object_unref(OBJECT(ioc->master));
+ if (ioc->io_tag) {
+ g_source_remove(ioc->io_tag);
+ }
+ if (ioc->io_err) {
+ error_free(ioc->io_err);
+ }
+}
+
+
+static ssize_t qio_channel_websock_read_wire(QIOChannelWebsock *ioc,
+ Error **errp)
+{
+ ssize_t ret;
+
+ if (ioc->encinput.offset < 4096) {
+ size_t want = 4096 - ioc->encinput.offset;
+
+ buffer_reserve(&ioc->encinput, want);
+ ret = qio_channel_read(ioc->master,
+ (char *)ioc->encinput.buffer +
+ ioc->encinput.offset,
+ want,
+ errp);
+ if (ret < 0) {
+ return ret;
+ }
+ if (ret == 0 && ioc->encinput.offset == 0) {
+ ioc->io_eof = TRUE;
+ return 0;
+ }
+ ioc->encinput.offset += ret;
+ }
+
+ while (ioc->encinput.offset != 0) {
+ if (ioc->payload_remain == 0) {
+ ret = qio_channel_websock_decode_header(ioc, errp);
+ if (ret < 0) {
+ return ret;
+ }
+ }
+
+ ret = qio_channel_websock_decode_payload(ioc, errp);
+ if (ret < 0) {
+ return ret;
+ }
+ }
+ return 1;
+}
+
+
+static ssize_t qio_channel_websock_write_wire(QIOChannelWebsock *ioc,
+ Error **errp)
+{
+ ssize_t ret;
+ ssize_t done = 0;
+
+ while (ioc->encoutput.offset > 0) {
+ ret = qio_channel_write(ioc->master,
+ (char *)ioc->encoutput.buffer,
+ ioc->encoutput.offset,
+ errp);
+ if (ret < 0) {
+ if (ret == QIO_CHANNEL_ERR_BLOCK &&
+ done > 0) {
+ return done;
+ } else {
+ return ret;
+ }
+ }
+ buffer_advance(&ioc->encoutput, ret);
+ done += ret;
+ if (ioc->pong_remain < ret) {
+ ioc->pong_remain = 0;
+ } else {
+ ioc->pong_remain -= ret;
+ }
+ }
+ return done;
+}
+
+
+static void qio_channel_websock_flush_free(gpointer user_data)
+{
+ QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(user_data);
+ object_unref(OBJECT(wioc));
+}
+
+static void qio_channel_websock_set_watch(QIOChannelWebsock *ioc);
+
+static gboolean qio_channel_websock_flush(QIOChannel *ioc,
+ GIOCondition condition,
+ gpointer user_data)
+{
+ QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(user_data);
+ ssize_t ret;
+
+ if (condition & G_IO_OUT) {
+ ret = qio_channel_websock_write_wire(wioc, &wioc->io_err);
+ if (ret < 0) {
+ goto cleanup;
+ }
+ }
+
+ if (condition & G_IO_IN) {
+ ret = qio_channel_websock_read_wire(wioc, &wioc->io_err);
+ if (ret < 0) {
+ goto cleanup;
+ }
+ }
+
+ cleanup:
+ qio_channel_websock_set_watch(wioc);
+ return FALSE;
+}
+
+
+static void qio_channel_websock_unset_watch(QIOChannelWebsock *ioc)
+{
+ if (ioc->io_tag) {
+ g_source_remove(ioc->io_tag);
+ ioc->io_tag = 0;
+ }
+}
+
+static void qio_channel_websock_set_watch(QIOChannelWebsock *ioc)
+{
+ GIOCondition cond = 0;
+
+ qio_channel_websock_unset_watch(ioc);
+
+ if (ioc->io_err) {
+ return;
+ }
+
+ if (ioc->encoutput.offset) {
+ cond |= G_IO_OUT;
+ }
+ if (ioc->encinput.offset < QIO_CHANNEL_WEBSOCK_MAX_BUFFER &&
+ !ioc->io_eof) {
+ cond |= G_IO_IN;
+ }
+
+ if (cond) {
+ object_ref(OBJECT(ioc));
+ ioc->io_tag =
+ qio_channel_add_watch(ioc->master,
+ cond,
+ qio_channel_websock_flush,
+ ioc,
+ qio_channel_websock_flush_free);
+ }
+}
+
+
+static ssize_t qio_channel_websock_readv(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int **fds,
+ size_t *nfds,
+ Error **errp)
+{
+ QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(ioc);
+ size_t i;
+ ssize_t got = 0;
+ ssize_t ret;
+
+ if (wioc->io_err) {
+ error_propagate(errp, error_copy(wioc->io_err));
+ return -1;
+ }
+
+ if (!wioc->rawinput.offset) {
+ ret = qio_channel_websock_read_wire(QIO_CHANNEL_WEBSOCK(ioc), errp);
+ if (ret < 0) {
+ return ret;
+ }
+ }
+
+ for (i = 0 ; i < niov ; i++) {
+ size_t want = iov[i].iov_len;
+ if (want > (wioc->rawinput.offset - got)) {
+ want = (wioc->rawinput.offset - got);
+ }
+
+ memcpy(iov[i].iov_base,
+ wioc->rawinput.buffer + got,
+ want);
+ got += want;
+
+ if (want < iov[i].iov_len) {
+ break;
+ }
+ }
+
+ buffer_advance(&wioc->rawinput, got);
+ qio_channel_websock_set_watch(wioc);
+ return got;
+}
+
+
+static ssize_t qio_channel_websock_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ Error **errp)
+{
+ QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(ioc);
+ ssize_t want = iov_size(iov, niov);
+ ssize_t avail;
+ ssize_t ret;
+
+ if (wioc->io_err) {
+ error_propagate(errp, error_copy(wioc->io_err));
+ return -1;
+ }
+
+ if (wioc->io_eof) {
+ error_setg(errp, "%s", "Broken pipe");
+ return -1;
+ }
+
+ avail = wioc->encoutput.offset >= QIO_CHANNEL_WEBSOCK_MAX_BUFFER ?
+ 0 : (QIO_CHANNEL_WEBSOCK_MAX_BUFFER - wioc->encoutput.offset);
+ if (want > avail) {
+ want = avail;
+ }
+
+ if (want) {
+ qio_channel_websock_encode(wioc,
+ QIO_CHANNEL_WEBSOCK_OPCODE_BINARY_FRAME,
+ iov, niov, want);
+ }
+
+ /* Even if want == 0, we'll try write_wire in case there's
+ * pending data we could usefully flush out
+ */
+ ret = qio_channel_websock_write_wire(wioc, errp);
+ if (ret < 0 &&
+ ret != QIO_CHANNEL_ERR_BLOCK) {
+ qio_channel_websock_unset_watch(wioc);
+ return -1;
+ }
+
+ qio_channel_websock_set_watch(wioc);
+
+ if (want == 0) {
+ return QIO_CHANNEL_ERR_BLOCK;
+ }
+
+ return want;
+}
+
+static int qio_channel_websock_set_blocking(QIOChannel *ioc,
+ bool enabled,
+ Error **errp)
+{
+ QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(ioc);
+
+ qio_channel_set_blocking(wioc->master, enabled, errp);
+ return 0;
+}
+
+static void qio_channel_websock_set_delay(QIOChannel *ioc,
+ bool enabled)
+{
+ QIOChannelWebsock *tioc = QIO_CHANNEL_WEBSOCK(ioc);
+
+ qio_channel_set_delay(tioc->master, enabled);
+}
+
+static void qio_channel_websock_set_cork(QIOChannel *ioc,
+ bool enabled)
+{
+ QIOChannelWebsock *tioc = QIO_CHANNEL_WEBSOCK(ioc);
+
+ qio_channel_set_cork(tioc->master, enabled);
+}
+
+static int qio_channel_websock_shutdown(QIOChannel *ioc,
+ QIOChannelShutdown how,
+ Error **errp)
+{
+ QIOChannelWebsock *tioc = QIO_CHANNEL_WEBSOCK(ioc);
+
+ return qio_channel_shutdown(tioc->master, how, errp);
+}
+
+static int qio_channel_websock_close(QIOChannel *ioc,
+ Error **errp)
+{
+ QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(ioc);
+
+ trace_qio_channel_websock_close(ioc);
+ return qio_channel_close(wioc->master, errp);
+}
+
+typedef struct QIOChannelWebsockSource QIOChannelWebsockSource;
+struct QIOChannelWebsockSource {
+ GSource parent;
+ QIOChannelWebsock *wioc;
+ GIOCondition condition;
+};
+
+static gboolean
+qio_channel_websock_source_check(GSource *source)
+{
+ QIOChannelWebsockSource *wsource = (QIOChannelWebsockSource *)source;
+ GIOCondition cond = 0;
+
+ if (wsource->wioc->rawinput.offset) {
+ cond |= G_IO_IN;
+ }
+ if (wsource->wioc->encoutput.offset < QIO_CHANNEL_WEBSOCK_MAX_BUFFER) {
+ cond |= G_IO_OUT;
+ }
+ if (wsource->wioc->io_eof) {
+ cond |= G_IO_HUP;
+ }
+ if (wsource->wioc->io_err) {
+ cond |= G_IO_ERR;
+ }
+
+ return cond & wsource->condition;
+}
+
+static gboolean
+qio_channel_websock_source_prepare(GSource *source,
+ gint *timeout)
+{
+ *timeout = -1;
+ return qio_channel_websock_source_check(source);
+}
+
+static gboolean
+qio_channel_websock_source_dispatch(GSource *source,
+ GSourceFunc callback,
+ gpointer user_data)
+{
+ QIOChannelFunc func = (QIOChannelFunc)callback;
+ QIOChannelWebsockSource *wsource = (QIOChannelWebsockSource *)source;
+
+ return (*func)(QIO_CHANNEL(wsource->wioc),
+ qio_channel_websock_source_check(source),
+ user_data);
+}
+
+static void
+qio_channel_websock_source_finalize(GSource *source)
+{
+ QIOChannelWebsockSource *ssource = (QIOChannelWebsockSource *)source;
+
+ object_unref(OBJECT(ssource->wioc));
+}
+
+GSourceFuncs qio_channel_websock_source_funcs = {
+ qio_channel_websock_source_prepare,
+ qio_channel_websock_source_check,
+ qio_channel_websock_source_dispatch,
+ qio_channel_websock_source_finalize
+};
+
+static GSource *qio_channel_websock_create_watch(QIOChannel *ioc,
+ GIOCondition condition)
+{
+ QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(ioc);
+ QIOChannelWebsockSource *ssource;
+ GSource *source;
+
+ source = g_source_new(&qio_channel_websock_source_funcs,
+ sizeof(QIOChannelWebsockSource));
+ ssource = (QIOChannelWebsockSource *)source;
+
+ ssource->wioc = wioc;
+ object_ref(OBJECT(wioc));
+
+ ssource->condition = condition;
+
+ qio_channel_websock_set_watch(wioc);
+ return source;
+}
+
+static void qio_channel_websock_class_init(ObjectClass *klass,
+ void *class_data G_GNUC_UNUSED)
+{
+ QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
+
+ ioc_klass->io_writev = qio_channel_websock_writev;
+ ioc_klass->io_readv = qio_channel_websock_readv;
+ ioc_klass->io_set_blocking = qio_channel_websock_set_blocking;
+ ioc_klass->io_set_cork = qio_channel_websock_set_cork;
+ ioc_klass->io_set_delay = qio_channel_websock_set_delay;
+ ioc_klass->io_close = qio_channel_websock_close;
+ ioc_klass->io_shutdown = qio_channel_websock_shutdown;
+ ioc_klass->io_create_watch = qio_channel_websock_create_watch;
+}
+
+static const TypeInfo qio_channel_websock_info = {
+ .parent = TYPE_QIO_CHANNEL,
+ .name = TYPE_QIO_CHANNEL_WEBSOCK,
+ .instance_size = sizeof(QIOChannelWebsock),
+ .instance_finalize = qio_channel_websock_finalize,
+ .class_init = qio_channel_websock_class_init,
+};
+
+static void qio_channel_websock_register_types(void)
+{
+ type_register_static(&qio_channel_websock_info);
+}
+
+type_init(qio_channel_websock_register_types);
diff --git a/io/channel.c b/io/channel.c
new file mode 100644
index 000000000..e8b019dc3
--- /dev/null
+++ b/io/channel.c
@@ -0,0 +1,623 @@
+/*
+ * QEMU I/O channels
+ *
+ * Copyright (c) 2015 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include "qemu/osdep.h"
+#include "io/channel.h"
+#include "qapi/error.h"
+#include "qemu/main-loop.h"
+#include "qemu/module.h"
+#include "qemu/iov.h"
+
+bool qio_channel_has_feature(QIOChannel *ioc,
+ QIOChannelFeature feature)
+{
+ return ioc->features & (1 << feature);
+}
+
+
+void qio_channel_set_feature(QIOChannel *ioc,
+ QIOChannelFeature feature)
+{
+ ioc->features |= (1 << feature);
+}
+
+
+void qio_channel_set_name(QIOChannel *ioc,
+ const char *name)
+{
+ g_free(ioc->name);
+ ioc->name = g_strdup(name);
+}
+
+
+ssize_t qio_channel_readv_full(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int **fds,
+ size_t *nfds,
+ Error **errp)
+{
+ QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+
+ if ((fds || nfds) &&
+ !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
+ error_setg_errno(errp, EINVAL,
+ "Channel does not support file descriptor passing");
+ return -1;
+ }
+
+ return klass->io_readv(ioc, iov, niov, fds, nfds, errp);
+}
+
+
+ssize_t qio_channel_writev_full(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ Error **errp)
+{
+ QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+
+ if ((fds || nfds) &&
+ !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
+ error_setg_errno(errp, EINVAL,
+ "Channel does not support file descriptor passing");
+ return -1;
+ }
+
+ return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
+}
+
+
+int qio_channel_readv_all_eof(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ Error **errp)
+{
+ return qio_channel_readv_full_all_eof(ioc, iov, niov, NULL, NULL, errp);
+}
+
+int qio_channel_readv_all(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ Error **errp)
+{
+ return qio_channel_readv_full_all(ioc, iov, niov, NULL, NULL, errp);
+}
+
+int qio_channel_readv_full_all_eof(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int **fds, size_t *nfds,
+ Error **errp)
+{
+ int ret = -1;
+ struct iovec *local_iov = g_new(struct iovec, niov);
+ struct iovec *local_iov_head = local_iov;
+ unsigned int nlocal_iov = niov;
+ int **local_fds = fds;
+ size_t *local_nfds = nfds;
+ bool partial = false;
+
+ if (nfds) {
+ *nfds = 0;
+ }
+
+ if (fds) {
+ *fds = NULL;
+ }
+
+ nlocal_iov = iov_copy(local_iov, nlocal_iov,
+ iov, niov,
+ 0, iov_size(iov, niov));
+
+ while ((nlocal_iov > 0) || local_fds) {
+ ssize_t len;
+ len = qio_channel_readv_full(ioc, local_iov, nlocal_iov, local_fds,
+ local_nfds, errp);
+ if (len == QIO_CHANNEL_ERR_BLOCK) {
+ if (qemu_in_coroutine()) {
+ qio_channel_yield(ioc, G_IO_IN);
+ } else {
+ qio_channel_wait(ioc, G_IO_IN);
+ }
+ continue;
+ }
+
+ if (len == 0) {
+ if (local_nfds && *local_nfds) {
+ /*
+ * Got some FDs, but no data yet. This isn't an EOF
+ * scenario (yet), so carry on to try to read data
+ * on next loop iteration
+ */
+ goto next_iter;
+ } else if (!partial) {
+ /* No fds and no data - EOF before any data read */
+ ret = 0;
+ goto cleanup;
+ } else {
+ len = -1;
+ error_setg(errp,
+ "Unexpected end-of-file before all data were read");
+ /* Fallthrough into len < 0 handling */
+ }
+ }
+
+ if (len < 0) {
+ /* Close any FDs we previously received */
+ if (nfds && fds) {
+ size_t i;
+ for (i = 0; i < (*nfds); i++) {
+ close((*fds)[i]);
+ }
+ g_free(*fds);
+ *fds = NULL;
+ *nfds = 0;
+ }
+ goto cleanup;
+ }
+
+ if (nlocal_iov) {
+ iov_discard_front(&local_iov, &nlocal_iov, len);
+ }
+
+next_iter:
+ partial = true;
+ local_fds = NULL;
+ local_nfds = NULL;
+ }
+
+ ret = 1;
+
+ cleanup:
+ g_free(local_iov_head);
+ return ret;
+}
+
+int qio_channel_readv_full_all(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int **fds, size_t *nfds,
+ Error **errp)
+{
+ int ret = qio_channel_readv_full_all_eof(ioc, iov, niov, fds, nfds, errp);
+
+ if (ret == 0) {
+ error_setg(errp, "Unexpected end-of-file before all data were read");
+ return -1;
+ }
+ if (ret == 1) {
+ return 0;
+ }
+
+ return ret;
+}
+
+int qio_channel_writev_all(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ Error **errp)
+{
+ return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, errp);
+}
+
+int qio_channel_writev_full_all(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds, size_t nfds,
+ Error **errp)
+{
+ int ret = -1;
+ struct iovec *local_iov = g_new(struct iovec, niov);
+ struct iovec *local_iov_head = local_iov;
+ unsigned int nlocal_iov = niov;
+
+ nlocal_iov = iov_copy(local_iov, nlocal_iov,
+ iov, niov,
+ 0, iov_size(iov, niov));
+
+ while (nlocal_iov > 0) {
+ ssize_t len;
+ len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
+ errp);
+ if (len == QIO_CHANNEL_ERR_BLOCK) {
+ if (qemu_in_coroutine()) {
+ qio_channel_yield(ioc, G_IO_OUT);
+ } else {
+ qio_channel_wait(ioc, G_IO_OUT);
+ }
+ continue;
+ }
+ if (len < 0) {
+ goto cleanup;
+ }
+
+ iov_discard_front(&local_iov, &nlocal_iov, len);
+
+ fds = NULL;
+ nfds = 0;
+ }
+
+ ret = 0;
+ cleanup:
+ g_free(local_iov_head);
+ return ret;
+}
+
+ssize_t qio_channel_readv(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ Error **errp)
+{
+ return qio_channel_readv_full(ioc, iov, niov, NULL, NULL, errp);
+}
+
+
+ssize_t qio_channel_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ Error **errp)
+{
+ return qio_channel_writev_full(ioc, iov, niov, NULL, 0, errp);
+}
+
+
+ssize_t qio_channel_read(QIOChannel *ioc,
+ char *buf,
+ size_t buflen,
+ Error **errp)
+{
+ struct iovec iov = { .iov_base = buf, .iov_len = buflen };
+ return qio_channel_readv_full(ioc, &iov, 1, NULL, NULL, errp);
+}
+
+
+ssize_t qio_channel_write(QIOChannel *ioc,
+ const char *buf,
+ size_t buflen,
+ Error **errp)
+{
+ struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
+ return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, errp);
+}
+
+
+int qio_channel_read_all_eof(QIOChannel *ioc,
+ char *buf,
+ size_t buflen,
+ Error **errp)
+{
+ struct iovec iov = { .iov_base = buf, .iov_len = buflen };
+ return qio_channel_readv_all_eof(ioc, &iov, 1, errp);
+}
+
+
+int qio_channel_read_all(QIOChannel *ioc,
+ char *buf,
+ size_t buflen,
+ Error **errp)
+{
+ struct iovec iov = { .iov_base = buf, .iov_len = buflen };
+ return qio_channel_readv_all(ioc, &iov, 1, errp);
+}
+
+
+int qio_channel_write_all(QIOChannel *ioc,
+ const char *buf,
+ size_t buflen,
+ Error **errp)
+{
+ struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
+ return qio_channel_writev_all(ioc, &iov, 1, errp);
+}
+
+
+int qio_channel_set_blocking(QIOChannel *ioc,
+ bool enabled,
+ Error **errp)
+{
+ QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+ return klass->io_set_blocking(ioc, enabled, errp);
+}
+
+
+int qio_channel_close(QIOChannel *ioc,
+ Error **errp)
+{
+ QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+ return klass->io_close(ioc, errp);
+}
+
+
+GSource *qio_channel_create_watch(QIOChannel *ioc,
+ GIOCondition condition)
+{
+ QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+ GSource *ret = klass->io_create_watch(ioc, condition);
+
+ if (ioc->name) {
+ g_source_set_name(ret, ioc->name);
+ }
+
+ return ret;
+}
+
+
+void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
+ AioContext *ctx,
+ IOHandler *io_read,
+ IOHandler *io_write,
+ void *opaque)
+{
+ QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+
+ klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque);
+}
+
+guint qio_channel_add_watch_full(QIOChannel *ioc,
+ GIOCondition condition,
+ QIOChannelFunc func,
+ gpointer user_data,
+ GDestroyNotify notify,
+ GMainContext *context)
+{
+ GSource *source;
+ guint id;
+
+ source = qio_channel_create_watch(ioc, condition);
+
+ g_source_set_callback(source, (GSourceFunc)func, user_data, notify);
+
+ id = g_source_attach(source, context);
+ g_source_unref(source);
+
+ return id;
+}
+
+guint qio_channel_add_watch(QIOChannel *ioc,
+ GIOCondition condition,
+ QIOChannelFunc func,
+ gpointer user_data,
+ GDestroyNotify notify)
+{
+ return qio_channel_add_watch_full(ioc, condition, func,
+ user_data, notify, NULL);
+}
+
+GSource *qio_channel_add_watch_source(QIOChannel *ioc,
+ GIOCondition condition,
+ QIOChannelFunc func,
+ gpointer user_data,
+ GDestroyNotify notify,
+ GMainContext *context)
+{
+ GSource *source;
+ guint id;
+
+ id = qio_channel_add_watch_full(ioc, condition, func,
+ user_data, notify, context);
+ source = g_main_context_find_source_by_id(context, id);
+ g_source_ref(source);
+ return source;
+}
+
+
+int qio_channel_shutdown(QIOChannel *ioc,
+ QIOChannelShutdown how,
+ Error **errp)
+{
+ QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+
+ if (!klass->io_shutdown) {
+ error_setg(errp, "Data path shutdown not supported");
+ return -1;
+ }
+
+ return klass->io_shutdown(ioc, how, errp);
+}
+
+
+void qio_channel_set_delay(QIOChannel *ioc,
+ bool enabled)
+{
+ QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+
+ if (klass->io_set_delay) {
+ klass->io_set_delay(ioc, enabled);
+ }
+}
+
+
+void qio_channel_set_cork(QIOChannel *ioc,
+ bool enabled)
+{
+ QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+
+ if (klass->io_set_cork) {
+ klass->io_set_cork(ioc, enabled);
+ }
+}
+
+
+off_t qio_channel_io_seek(QIOChannel *ioc,
+ off_t offset,
+ int whence,
+ Error **errp)
+{
+ QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+
+ if (!klass->io_seek) {
+ error_setg(errp, "Channel does not support random access");
+ return -1;
+ }
+
+ return klass->io_seek(ioc, offset, whence, errp);
+}
+
+
+static void qio_channel_restart_read(void *opaque)
+{
+ QIOChannel *ioc = opaque;
+ Coroutine *co = ioc->read_coroutine;
+
+ /* Assert that aio_co_wake() reenters the coroutine directly */
+ assert(qemu_get_current_aio_context() ==
+ qemu_coroutine_get_aio_context(co));
+ aio_co_wake(co);
+}
+
+static void qio_channel_restart_write(void *opaque)
+{
+ QIOChannel *ioc = opaque;
+ Coroutine *co = ioc->write_coroutine;
+
+ /* Assert that aio_co_wake() reenters the coroutine directly */
+ assert(qemu_get_current_aio_context() ==
+ qemu_coroutine_get_aio_context(co));
+ aio_co_wake(co);
+}
+
+static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc)
+{
+ IOHandler *rd_handler = NULL, *wr_handler = NULL;
+ AioContext *ctx;
+
+ if (ioc->read_coroutine) {
+ rd_handler = qio_channel_restart_read;
+ }
+ if (ioc->write_coroutine) {
+ wr_handler = qio_channel_restart_write;
+ }
+
+ ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context();
+ qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc);
+}
+
+void qio_channel_attach_aio_context(QIOChannel *ioc,
+ AioContext *ctx)
+{
+ assert(!ioc->read_coroutine);
+ assert(!ioc->write_coroutine);
+ ioc->ctx = ctx;
+}
+
+void qio_channel_detach_aio_context(QIOChannel *ioc)
+{
+ ioc->read_coroutine = NULL;
+ ioc->write_coroutine = NULL;
+ qio_channel_set_aio_fd_handlers(ioc);
+ ioc->ctx = NULL;
+}
+
+void coroutine_fn qio_channel_yield(QIOChannel *ioc,
+ GIOCondition condition)
+{
+ assert(qemu_in_coroutine());
+ if (condition == G_IO_IN) {
+ assert(!ioc->read_coroutine);
+ ioc->read_coroutine = qemu_coroutine_self();
+ } else if (condition == G_IO_OUT) {
+ assert(!ioc->write_coroutine);
+ ioc->write_coroutine = qemu_coroutine_self();
+ } else {
+ abort();
+ }
+ qio_channel_set_aio_fd_handlers(ioc);
+ qemu_coroutine_yield();
+
+ /* Allow interrupting the operation by reentering the coroutine other than
+ * through the aio_fd_handlers. */
+ if (condition == G_IO_IN && ioc->read_coroutine) {
+ ioc->read_coroutine = NULL;
+ qio_channel_set_aio_fd_handlers(ioc);
+ } else if (condition == G_IO_OUT && ioc->write_coroutine) {
+ ioc->write_coroutine = NULL;
+ qio_channel_set_aio_fd_handlers(ioc);
+ }
+}
+
+
+static gboolean qio_channel_wait_complete(QIOChannel *ioc,
+ GIOCondition condition,
+ gpointer opaque)
+{
+ GMainLoop *loop = opaque;
+
+ g_main_loop_quit(loop);
+ return FALSE;
+}
+
+
+void qio_channel_wait(QIOChannel *ioc,
+ GIOCondition condition)
+{
+ GMainContext *ctxt = g_main_context_new();
+ GMainLoop *loop = g_main_loop_new(ctxt, TRUE);
+ GSource *source;
+
+ source = qio_channel_create_watch(ioc, condition);
+
+ g_source_set_callback(source,
+ (GSourceFunc)qio_channel_wait_complete,
+ loop,
+ NULL);
+
+ g_source_attach(source, ctxt);
+
+ g_main_loop_run(loop);
+
+ g_source_unref(source);
+ g_main_loop_unref(loop);
+ g_main_context_unref(ctxt);
+}
+
+
+static void qio_channel_finalize(Object *obj)
+{
+ QIOChannel *ioc = QIO_CHANNEL(obj);
+
+ g_free(ioc->name);
+
+#ifdef _WIN32
+ if (ioc->event) {
+ CloseHandle(ioc->event);
+ }
+#endif
+}
+
+static const TypeInfo qio_channel_info = {
+ .parent = TYPE_OBJECT,
+ .name = TYPE_QIO_CHANNEL,
+ .instance_size = sizeof(QIOChannel),
+ .instance_finalize = qio_channel_finalize,
+ .abstract = true,
+ .class_size = sizeof(QIOChannelClass),
+};
+
+
+static void qio_channel_register_types(void)
+{
+ type_register_static(&qio_channel_info);
+}
+
+
+type_init(qio_channel_register_types);
diff --git a/io/dns-resolver.c b/io/dns-resolver.c
new file mode 100644
index 000000000..53b0e8407
--- /dev/null
+++ b/io/dns-resolver.c
@@ -0,0 +1,283 @@
+/*
+ * QEMU DNS resolver
+ *
+ * Copyright (c) 2016 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include "qemu/osdep.h"
+#include "io/dns-resolver.h"
+#include "qapi/clone-visitor.h"
+#include "qapi/qapi-visit-sockets.h"
+#include "qemu/sockets.h"
+#include "qapi/error.h"
+#include "qemu/cutils.h"
+#include "qemu/module.h"
+
+#ifndef AI_NUMERICSERV
+# define AI_NUMERICSERV 0
+#endif
+
+static QIODNSResolver *instance;
+static GOnce instance_init = G_ONCE_INIT;
+
+static gpointer qio_dns_resolve_init_instance(gpointer unused G_GNUC_UNUSED)
+{
+ instance = QIO_DNS_RESOLVER(object_new(TYPE_QIO_DNS_RESOLVER));
+ return NULL;
+}
+
+QIODNSResolver *qio_dns_resolver_get_instance(void)
+{
+ g_once(&instance_init, qio_dns_resolve_init_instance, NULL);
+ return instance;
+}
+
+static int qio_dns_resolver_lookup_sync_inet(QIODNSResolver *resolver,
+ SocketAddress *addr,
+ size_t *naddrs,
+ SocketAddress ***addrs,
+ Error **errp)
+{
+ struct addrinfo ai, *res, *e;
+ InetSocketAddress *iaddr = &addr->u.inet;
+ char port[33];
+ char uaddr[INET6_ADDRSTRLEN + 1];
+ char uport[33];
+ int rc;
+ Error *err = NULL;
+ size_t i;
+
+ *naddrs = 0;
+ *addrs = NULL;
+
+ memset(&ai, 0, sizeof(ai));
+ ai.ai_flags = AI_PASSIVE;
+ if (iaddr->has_numeric && iaddr->numeric) {
+ ai.ai_flags |= AI_NUMERICHOST | AI_NUMERICSERV;
+ }
+ ai.ai_family = inet_ai_family_from_address(iaddr, &err);
+ ai.ai_socktype = SOCK_STREAM;
+
+ if (err) {
+ error_propagate(errp, err);
+ return -1;
+ }
+
+ if (iaddr->host == NULL) {
+ error_setg(errp, "host not specified");
+ return -1;
+ }
+ if (iaddr->port != NULL) {
+ pstrcpy(port, sizeof(port), iaddr->port);
+ } else {
+ port[0] = '\0';
+ }
+
+ rc = getaddrinfo(strlen(iaddr->host) ? iaddr->host : NULL,
+ strlen(port) ? port : NULL, &ai, &res);
+ if (rc != 0) {
+ error_setg(errp, "address resolution failed for %s:%s: %s",
+ iaddr->host, port, gai_strerror(rc));
+ return -1;
+ }
+
+ for (e = res; e != NULL; e = e->ai_next) {
+ (*naddrs)++;
+ }
+
+ *addrs = g_new0(SocketAddress *, *naddrs);
+
+ /* create socket + bind */
+ for (i = 0, e = res; e != NULL; i++, e = e->ai_next) {
+ SocketAddress *newaddr = g_new0(SocketAddress, 1);
+
+ newaddr->type = SOCKET_ADDRESS_TYPE_INET;
+
+ getnameinfo((struct sockaddr *)e->ai_addr, e->ai_addrlen,
+ uaddr, INET6_ADDRSTRLEN, uport, 32,
+ NI_NUMERICHOST | NI_NUMERICSERV);
+
+ newaddr->u.inet = (InetSocketAddress){
+ .host = g_strdup(uaddr),
+ .port = g_strdup(uport),
+ .has_numeric = true,
+ .numeric = true,
+ .has_to = iaddr->has_to,
+ .to = iaddr->to,
+ .has_ipv4 = iaddr->has_ipv4,
+ .ipv4 = iaddr->ipv4,
+ .has_ipv6 = iaddr->has_ipv6,
+ .ipv6 = iaddr->ipv6,
+#ifdef HAVE_IPPROTO_MPTCP
+ .has_mptcp = iaddr->has_mptcp,
+ .mptcp = iaddr->mptcp,
+#endif
+ };
+
+ (*addrs)[i] = newaddr;
+ }
+ freeaddrinfo(res);
+ return 0;
+}
+
+
+static int qio_dns_resolver_lookup_sync_nop(QIODNSResolver *resolver,
+ SocketAddress *addr,
+ size_t *naddrs,
+ SocketAddress ***addrs,
+ Error **errp)
+{
+ *naddrs = 1;
+ *addrs = g_new0(SocketAddress *, 1);
+ (*addrs)[0] = QAPI_CLONE(SocketAddress, addr);
+
+ return 0;
+}
+
+
+int qio_dns_resolver_lookup_sync(QIODNSResolver *resolver,
+ SocketAddress *addr,
+ size_t *naddrs,
+ SocketAddress ***addrs,
+ Error **errp)
+{
+ switch (addr->type) {
+ case SOCKET_ADDRESS_TYPE_INET:
+ return qio_dns_resolver_lookup_sync_inet(resolver,
+ addr,
+ naddrs,
+ addrs,
+ errp);
+
+ case SOCKET_ADDRESS_TYPE_UNIX:
+ case SOCKET_ADDRESS_TYPE_VSOCK:
+ case SOCKET_ADDRESS_TYPE_FD:
+ return qio_dns_resolver_lookup_sync_nop(resolver,
+ addr,
+ naddrs,
+ addrs,
+ errp);
+
+ default:
+ abort();
+ }
+}
+
+
+struct QIODNSResolverLookupData {
+ SocketAddress *addr;
+ SocketAddress **addrs;
+ size_t naddrs;
+};
+
+
+static void qio_dns_resolver_lookup_data_free(gpointer opaque)
+{
+ struct QIODNSResolverLookupData *data = opaque;
+ size_t i;
+
+ qapi_free_SocketAddress(data->addr);
+ for (i = 0; i < data->naddrs; i++) {
+ qapi_free_SocketAddress(data->addrs[i]);
+ }
+
+ g_free(data->addrs);
+ g_free(data);
+}
+
+
+static void qio_dns_resolver_lookup_worker(QIOTask *task,
+ gpointer opaque)
+{
+ QIODNSResolver *resolver = QIO_DNS_RESOLVER(qio_task_get_source(task));
+ struct QIODNSResolverLookupData *data = opaque;
+ Error *err = NULL;
+
+ qio_dns_resolver_lookup_sync(resolver,
+ data->addr,
+ &data->naddrs,
+ &data->addrs,
+ &err);
+ if (err) {
+ qio_task_set_error(task, err);
+ } else {
+ qio_task_set_result_pointer(task, opaque, NULL);
+ }
+
+ object_unref(OBJECT(resolver));
+}
+
+
+void qio_dns_resolver_lookup_async(QIODNSResolver *resolver,
+ SocketAddress *addr,
+ QIOTaskFunc func,
+ gpointer opaque,
+ GDestroyNotify notify)
+{
+ QIOTask *task;
+ struct QIODNSResolverLookupData *data =
+ g_new0(struct QIODNSResolverLookupData, 1);
+
+ data->addr = QAPI_CLONE(SocketAddress, addr);
+
+ task = qio_task_new(OBJECT(resolver), func, opaque, notify);
+
+ qio_task_run_in_thread(task,
+ qio_dns_resolver_lookup_worker,
+ data,
+ qio_dns_resolver_lookup_data_free,
+ NULL);
+}
+
+
+void qio_dns_resolver_lookup_result(QIODNSResolver *resolver,
+ QIOTask *task,
+ size_t *naddrs,
+ SocketAddress ***addrs)
+{
+ struct QIODNSResolverLookupData *data =
+ qio_task_get_result_pointer(task);
+ size_t i;
+
+ *naddrs = 0;
+ *addrs = NULL;
+ if (!data) {
+ return;
+ }
+
+ *naddrs = data->naddrs;
+ *addrs = g_new0(SocketAddress *, data->naddrs);
+ for (i = 0; i < data->naddrs; i++) {
+ (*addrs)[i] = QAPI_CLONE(SocketAddress, data->addrs[i]);
+ }
+}
+
+
+static const TypeInfo qio_dns_resolver_info = {
+ .parent = TYPE_OBJECT,
+ .name = TYPE_QIO_DNS_RESOLVER,
+ .instance_size = sizeof(QIODNSResolver),
+};
+
+
+static void qio_dns_resolver_register_types(void)
+{
+ type_register_static(&qio_dns_resolver_info);
+}
+
+
+type_init(qio_dns_resolver_register_types);
diff --git a/io/meson.build b/io/meson.build
new file mode 100644
index 000000000..bbcd3c53a
--- /dev/null
+++ b/io/meson.build
@@ -0,0 +1,15 @@
+io_ss.add(genh)
+io_ss.add(files(
+ 'channel-buffer.c',
+ 'channel-command.c',
+ 'channel-file.c',
+ 'channel-socket.c',
+ 'channel-tls.c',
+ 'channel-util.c',
+ 'channel-watch.c',
+ 'channel-websock.c',
+ 'channel.c',
+ 'dns-resolver.c',
+ 'net-listener.c',
+ 'task.c',
+), gnutls)
diff --git a/io/net-listener.c b/io/net-listener.c
new file mode 100644
index 000000000..1c984d69c
--- /dev/null
+++ b/io/net-listener.c
@@ -0,0 +1,322 @@
+/*
+ * QEMU network listener
+ *
+ * Copyright (c) 2016-2017 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include "qemu/osdep.h"
+#include "io/net-listener.h"
+#include "io/dns-resolver.h"
+#include "qapi/error.h"
+#include "qemu/module.h"
+
+QIONetListener *qio_net_listener_new(void)
+{
+ return QIO_NET_LISTENER(object_new(TYPE_QIO_NET_LISTENER));
+}
+
+void qio_net_listener_set_name(QIONetListener *listener,
+ const char *name)
+{
+ g_free(listener->name);
+ listener->name = g_strdup(name);
+}
+
+
+static gboolean qio_net_listener_channel_func(QIOChannel *ioc,
+ GIOCondition condition,
+ gpointer opaque)
+{
+ QIONetListener *listener = QIO_NET_LISTENER(opaque);
+ QIOChannelSocket *sioc;
+
+ sioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(ioc),
+ NULL);
+ if (!sioc) {
+ return TRUE;
+ }
+
+ if (listener->io_func) {
+ listener->io_func(listener, sioc, listener->io_data);
+ }
+
+ object_unref(OBJECT(sioc));
+
+ return TRUE;
+}
+
+
+int qio_net_listener_open_sync(QIONetListener *listener,
+ SocketAddress *addr,
+ int num,
+ Error **errp)
+{
+ QIODNSResolver *resolver = qio_dns_resolver_get_instance();
+ SocketAddress **resaddrs;
+ size_t nresaddrs;
+ size_t i;
+ Error *err = NULL;
+ bool success = false;
+
+ if (qio_dns_resolver_lookup_sync(resolver,
+ addr,
+ &nresaddrs,
+ &resaddrs,
+ errp) < 0) {
+ return -1;
+ }
+
+ for (i = 0; i < nresaddrs; i++) {
+ QIOChannelSocket *sioc = qio_channel_socket_new();
+
+ if (qio_channel_socket_listen_sync(sioc, resaddrs[i], num,
+ err ? NULL : &err) == 0) {
+ success = true;
+
+ qio_net_listener_add(listener, sioc);
+ }
+
+ qapi_free_SocketAddress(resaddrs[i]);
+ object_unref(OBJECT(sioc));
+ }
+ g_free(resaddrs);
+
+ if (success) {
+ error_free(err);
+ return 0;
+ } else {
+ error_propagate(errp, err);
+ return -1;
+ }
+}
+
+
+void qio_net_listener_add(QIONetListener *listener,
+ QIOChannelSocket *sioc)
+{
+ if (listener->name) {
+ char *name = g_strdup_printf("%s-listen", listener->name);
+ qio_channel_set_name(QIO_CHANNEL(sioc), name);
+ g_free(name);
+ }
+
+ listener->sioc = g_renew(QIOChannelSocket *, listener->sioc,
+ listener->nsioc + 1);
+ listener->io_source = g_renew(typeof(listener->io_source[0]),
+ listener->io_source,
+ listener->nsioc + 1);
+ listener->sioc[listener->nsioc] = sioc;
+ listener->io_source[listener->nsioc] = NULL;
+
+ object_ref(OBJECT(sioc));
+ listener->connected = true;
+
+ if (listener->io_func != NULL) {
+ object_ref(OBJECT(listener));
+ listener->io_source[listener->nsioc] = qio_channel_add_watch_source(
+ QIO_CHANNEL(listener->sioc[listener->nsioc]), G_IO_IN,
+ qio_net_listener_channel_func,
+ listener, (GDestroyNotify)object_unref, NULL);
+ }
+
+ listener->nsioc++;
+}
+
+
+void qio_net_listener_set_client_func_full(QIONetListener *listener,
+ QIONetListenerClientFunc func,
+ gpointer data,
+ GDestroyNotify notify,
+ GMainContext *context)
+{
+ size_t i;
+
+ if (listener->io_notify) {
+ listener->io_notify(listener->io_data);
+ }
+ listener->io_func = func;
+ listener->io_data = data;
+ listener->io_notify = notify;
+
+ for (i = 0; i < listener->nsioc; i++) {
+ if (listener->io_source[i]) {
+ g_source_destroy(listener->io_source[i]);
+ g_source_unref(listener->io_source[i]);
+ listener->io_source[i] = NULL;
+ }
+ }
+
+ if (listener->io_func != NULL) {
+ for (i = 0; i < listener->nsioc; i++) {
+ object_ref(OBJECT(listener));
+ listener->io_source[i] = qio_channel_add_watch_source(
+ QIO_CHANNEL(listener->sioc[i]), G_IO_IN,
+ qio_net_listener_channel_func,
+ listener, (GDestroyNotify)object_unref, context);
+ }
+ }
+}
+
+void qio_net_listener_set_client_func(QIONetListener *listener,
+ QIONetListenerClientFunc func,
+ gpointer data,
+ GDestroyNotify notify)
+{
+ qio_net_listener_set_client_func_full(listener, func, data,
+ notify, NULL);
+}
+
+struct QIONetListenerClientWaitData {
+ QIOChannelSocket *sioc;
+ GMainLoop *loop;
+};
+
+
+static gboolean qio_net_listener_wait_client_func(QIOChannel *ioc,
+ GIOCondition condition,
+ gpointer opaque)
+{
+ struct QIONetListenerClientWaitData *data = opaque;
+ QIOChannelSocket *sioc;
+
+ sioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(ioc),
+ NULL);
+ if (!sioc) {
+ return TRUE;
+ }
+
+ if (data->sioc) {
+ object_unref(OBJECT(sioc));
+ } else {
+ data->sioc = sioc;
+ g_main_loop_quit(data->loop);
+ }
+
+ return TRUE;
+}
+
+QIOChannelSocket *qio_net_listener_wait_client(QIONetListener *listener)
+{
+ GMainContext *ctxt = g_main_context_new();
+ GMainLoop *loop = g_main_loop_new(ctxt, TRUE);
+ GSource **sources;
+ struct QIONetListenerClientWaitData data = {
+ .sioc = NULL,
+ .loop = loop
+ };
+ size_t i;
+
+ for (i = 0; i < listener->nsioc; i++) {
+ if (listener->io_source[i]) {
+ g_source_destroy(listener->io_source[i]);
+ g_source_unref(listener->io_source[i]);
+ listener->io_source[i] = NULL;
+ }
+ }
+
+ sources = g_new0(GSource *, listener->nsioc);
+ for (i = 0; i < listener->nsioc; i++) {
+ sources[i] = qio_channel_create_watch(QIO_CHANNEL(listener->sioc[i]),
+ G_IO_IN);
+
+ g_source_set_callback(sources[i],
+ (GSourceFunc)qio_net_listener_wait_client_func,
+ &data,
+ NULL);
+ g_source_attach(sources[i], ctxt);
+ }
+
+ g_main_loop_run(loop);
+
+ for (i = 0; i < listener->nsioc; i++) {
+ g_source_unref(sources[i]);
+ }
+ g_free(sources);
+ g_main_loop_unref(loop);
+ g_main_context_unref(ctxt);
+
+ if (listener->io_func != NULL) {
+ for (i = 0; i < listener->nsioc; i++) {
+ object_ref(OBJECT(listener));
+ listener->io_source[i] = qio_channel_add_watch_source(
+ QIO_CHANNEL(listener->sioc[i]), G_IO_IN,
+ qio_net_listener_channel_func,
+ listener, (GDestroyNotify)object_unref, NULL);
+ }
+ }
+
+ return data.sioc;
+}
+
+void qio_net_listener_disconnect(QIONetListener *listener)
+{
+ size_t i;
+
+ if (!listener->connected) {
+ return;
+ }
+
+ for (i = 0; i < listener->nsioc; i++) {
+ if (listener->io_source[i]) {
+ g_source_destroy(listener->io_source[i]);
+ g_source_unref(listener->io_source[i]);
+ listener->io_source[i] = NULL;
+ }
+ qio_channel_close(QIO_CHANNEL(listener->sioc[i]), NULL);
+ }
+ listener->connected = false;
+}
+
+
+bool qio_net_listener_is_connected(QIONetListener *listener)
+{
+ return listener->connected;
+}
+
+static void qio_net_listener_finalize(Object *obj)
+{
+ QIONetListener *listener = QIO_NET_LISTENER(obj);
+ size_t i;
+
+ if (listener->io_notify) {
+ listener->io_notify(listener->io_data);
+ }
+ qio_net_listener_disconnect(listener);
+
+ for (i = 0; i < listener->nsioc; i++) {
+ object_unref(OBJECT(listener->sioc[i]));
+ }
+ g_free(listener->io_source);
+ g_free(listener->sioc);
+ g_free(listener->name);
+}
+
+static const TypeInfo qio_net_listener_info = {
+ .parent = TYPE_OBJECT,
+ .name = TYPE_QIO_NET_LISTENER,
+ .instance_size = sizeof(QIONetListener),
+ .instance_finalize = qio_net_listener_finalize,
+};
+
+
+static void qio_net_listener_register_types(void)
+{
+ type_register_static(&qio_net_listener_info);
+}
+
+
+type_init(qio_net_listener_register_types);
diff --git a/io/task.c b/io/task.c
new file mode 100644
index 000000000..451f26f8b
--- /dev/null
+++ b/io/task.c
@@ -0,0 +1,241 @@
+/*
+ * QEMU I/O task
+ *
+ * Copyright (c) 2015 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include "qemu/osdep.h"
+#include "io/task.h"
+#include "qapi/error.h"
+#include "qemu/thread.h"
+#include "qom/object.h"
+#include "trace.h"
+
+struct QIOTaskThreadData {
+ QIOTaskWorker worker;
+ gpointer opaque;
+ GDestroyNotify destroy;
+ GMainContext *context;
+ GSource *completion;
+};
+
+
+struct QIOTask {
+ Object *source;
+ QIOTaskFunc func;
+ gpointer opaque;
+ GDestroyNotify destroy;
+ Error *err;
+ gpointer result;
+ GDestroyNotify destroyResult;
+ QemuMutex thread_lock;
+ QemuCond thread_cond;
+ struct QIOTaskThreadData *thread;
+};
+
+
+QIOTask *qio_task_new(Object *source,
+ QIOTaskFunc func,
+ gpointer opaque,
+ GDestroyNotify destroy)
+{
+ QIOTask *task;
+
+ task = g_new0(QIOTask, 1);
+
+ task->source = source;
+ object_ref(source);
+ task->func = func;
+ task->opaque = opaque;
+ task->destroy = destroy;
+ qemu_mutex_init(&task->thread_lock);
+ qemu_cond_init(&task->thread_cond);
+
+ trace_qio_task_new(task, source, func, opaque);
+
+ return task;
+}
+
+static void qio_task_free(QIOTask *task)
+{
+ qemu_mutex_lock(&task->thread_lock);
+ if (task->thread) {
+ if (task->thread->destroy) {
+ task->thread->destroy(task->thread->opaque);
+ }
+
+ if (task->thread->context) {
+ g_main_context_unref(task->thread->context);
+ }
+
+ g_free(task->thread);
+ }
+
+ if (task->destroy) {
+ task->destroy(task->opaque);
+ }
+ if (task->destroyResult) {
+ task->destroyResult(task->result);
+ }
+ if (task->err) {
+ error_free(task->err);
+ }
+ object_unref(task->source);
+
+ qemu_mutex_unlock(&task->thread_lock);
+ qemu_mutex_destroy(&task->thread_lock);
+ qemu_cond_destroy(&task->thread_cond);
+
+ g_free(task);
+}
+
+
+static gboolean qio_task_thread_result(gpointer opaque)
+{
+ QIOTask *task = opaque;
+
+ trace_qio_task_thread_result(task);
+ qio_task_complete(task);
+
+ return FALSE;
+}
+
+
+static gpointer qio_task_thread_worker(gpointer opaque)
+{
+ QIOTask *task = opaque;
+
+ trace_qio_task_thread_run(task);
+
+ task->thread->worker(task, task->thread->opaque);
+
+ /* We're running in the background thread, and must only
+ * ever report the task results in the main event loop
+ * thread. So we schedule an idle callback to report
+ * the worker results
+ */
+ trace_qio_task_thread_exit(task);
+
+ qemu_mutex_lock(&task->thread_lock);
+
+ task->thread->completion = g_idle_source_new();
+ g_source_set_callback(task->thread->completion,
+ qio_task_thread_result, task, NULL);
+ g_source_attach(task->thread->completion,
+ task->thread->context);
+ g_source_unref(task->thread->completion);
+ trace_qio_task_thread_source_attach(task, task->thread->completion);
+
+ qemu_cond_signal(&task->thread_cond);
+ qemu_mutex_unlock(&task->thread_lock);
+
+ return NULL;
+}
+
+
+void qio_task_run_in_thread(QIOTask *task,
+ QIOTaskWorker worker,
+ gpointer opaque,
+ GDestroyNotify destroy,
+ GMainContext *context)
+{
+ struct QIOTaskThreadData *data = g_new0(struct QIOTaskThreadData, 1);
+ QemuThread thread;
+
+ if (context) {
+ g_main_context_ref(context);
+ }
+
+ data->worker = worker;
+ data->opaque = opaque;
+ data->destroy = destroy;
+ data->context = context;
+
+ task->thread = data;
+
+ trace_qio_task_thread_start(task, worker, opaque);
+ qemu_thread_create(&thread,
+ "io-task-worker",
+ qio_task_thread_worker,
+ task,
+ QEMU_THREAD_DETACHED);
+}
+
+
+void qio_task_wait_thread(QIOTask *task)
+{
+ qemu_mutex_lock(&task->thread_lock);
+ g_assert(task->thread != NULL);
+ while (task->thread->completion == NULL) {
+ qemu_cond_wait(&task->thread_cond, &task->thread_lock);
+ }
+
+ trace_qio_task_thread_source_cancel(task, task->thread->completion);
+ g_source_destroy(task->thread->completion);
+ qemu_mutex_unlock(&task->thread_lock);
+
+ qio_task_thread_result(task);
+}
+
+
+void qio_task_complete(QIOTask *task)
+{
+ task->func(task, task->opaque);
+ trace_qio_task_complete(task);
+ qio_task_free(task);
+}
+
+
+void qio_task_set_error(QIOTask *task,
+ Error *err)
+{
+ error_propagate(&task->err, err);
+}
+
+
+bool qio_task_propagate_error(QIOTask *task,
+ Error **errp)
+{
+ if (task->err) {
+ error_propagate(errp, task->err);
+ task->err = NULL;
+ return true;
+ }
+
+ return false;
+}
+
+
+void qio_task_set_result_pointer(QIOTask *task,
+ gpointer result,
+ GDestroyNotify destroy)
+{
+ task->result = result;
+ task->destroyResult = destroy;
+}
+
+
+gpointer qio_task_get_result_pointer(QIOTask *task)
+{
+ return task->result;
+}
+
+
+Object *qio_task_get_source(QIOTask *task)
+{
+ return task->source;
+}
diff --git a/io/trace-events b/io/trace-events
new file mode 100644
index 000000000..c5e814eb4
--- /dev/null
+++ b/io/trace-events
@@ -0,0 +1,65 @@
+# See docs/devel/tracing.rst for syntax documentation.
+
+# task.c
+qio_task_new(void *task, void *source, void *func, void *opaque) "Task new task=%p source=%p func=%p opaque=%p"
+qio_task_complete(void *task) "Task complete task=%p"
+qio_task_thread_start(void *task, void *worker, void *opaque) "Task thread start task=%p worker=%p opaque=%p"
+qio_task_thread_run(void *task) "Task thread run task=%p"
+qio_task_thread_exit(void *task) "Task thread exit task=%p"
+qio_task_thread_result(void *task) "Task thread result task=%p"
+qio_task_thread_source_attach(void *task, void *source) "Task thread source attach task=%p source=%p"
+qio_task_thread_source_cancel(void *task, void *source) "Task thread source cancel task=%p source=%p"
+
+# channel-socket.c
+qio_channel_socket_new(void *ioc) "Socket new ioc=%p"
+qio_channel_socket_new_fd(void *ioc, int fd) "Socket new ioc=%p fd=%d"
+qio_channel_socket_connect_sync(void *ioc, void *addr) "Socket connect sync ioc=%p addr=%p"
+qio_channel_socket_connect_async(void *ioc, void *addr) "Socket connect async ioc=%p addr=%p"
+qio_channel_socket_connect_fail(void *ioc) "Socket connect fail ioc=%p"
+qio_channel_socket_connect_complete(void *ioc, int fd) "Socket connect complete ioc=%p fd=%d"
+qio_channel_socket_listen_sync(void *ioc, void *addr, int num) "Socket listen sync ioc=%p addr=%p num=%d"
+qio_channel_socket_listen_async(void *ioc, void *addr, int num) "Socket listen async ioc=%p addr=%p num=%d"
+qio_channel_socket_listen_fail(void *ioc) "Socket listen fail ioc=%p"
+qio_channel_socket_listen_complete(void *ioc, int fd) "Socket listen complete ioc=%p fd=%d"
+qio_channel_socket_dgram_sync(void *ioc, void *localAddr, void *remoteAddr) "Socket dgram sync ioc=%p localAddr=%p remoteAddr=%p"
+qio_channel_socket_dgram_async(void *ioc, void *localAddr, void *remoteAddr) "Socket dgram async ioc=%p localAddr=%p remoteAddr=%p"
+qio_channel_socket_dgram_fail(void *ioc) "Socket dgram fail ioc=%p"
+qio_channel_socket_dgram_complete(void *ioc, int fd) "Socket dgram complete ioc=%p fd=%d"
+qio_channel_socket_accept(void *ioc) "Socket accept start ioc=%p"
+qio_channel_socket_accept_fail(void *ioc) "Socket accept fail ioc=%p"
+qio_channel_socket_accept_complete(void *ioc, void *cioc, int fd) "Socket accept complete ioc=%p cioc=%p fd=%d"
+
+# channel-file.c
+qio_channel_file_new_fd(void *ioc, int fd) "File new fd ioc=%p fd=%d"
+qio_channel_file_new_path(void *ioc, const char *path, int flags, int mode, int fd) "File new fd ioc=%p path=%s flags=%d mode=%d fd=%d"
+
+# channel-tls.c
+qio_channel_tls_new_client(void *ioc, void *master, void *creds, const char *hostname) "TLS new client ioc=%p master=%p creds=%p hostname=%s"
+qio_channel_tls_new_server(void *ioc, void *master, void *creds, const char *aclname) "TLS new client ioc=%p master=%p creds=%p acltname=%s"
+qio_channel_tls_handshake_start(void *ioc) "TLS handshake start ioc=%p"
+qio_channel_tls_handshake_pending(void *ioc, int status) "TLS handshake pending ioc=%p status=%d"
+qio_channel_tls_handshake_fail(void *ioc) "TLS handshake fail ioc=%p"
+qio_channel_tls_handshake_complete(void *ioc) "TLS handshake complete ioc=%p"
+qio_channel_tls_credentials_allow(void *ioc) "TLS credentials allow ioc=%p"
+qio_channel_tls_credentials_deny(void *ioc) "TLS credentials deny ioc=%p"
+
+# channel-websock.c
+qio_channel_websock_new_server(void *ioc, void *master) "Websock new client ioc=%p master=%p"
+qio_channel_websock_handshake_start(void *ioc) "Websock handshake start ioc=%p"
+qio_channel_websock_handshake_pending(void *ioc, int status) "Websock handshake pending ioc=%p status=%d"
+qio_channel_websock_handshake_reply(void *ioc) "Websock handshake reply ioc=%p"
+qio_channel_websock_handshake_fail(void *ioc, const char *msg) "Websock handshake fail ioc=%p err=%s"
+qio_channel_websock_handshake_complete(void *ioc) "Websock handshake complete ioc=%p"
+qio_channel_websock_http_greeting(void *ioc, const char *greeting) "Websocket HTTP request ioc=%p greeting='%s'"
+qio_channel_websock_http_request(void *ioc, const char *protocols, const char *version, const char *host, const char *connection, const char *upgrade, const char *key) "Websocket HTTP request ioc=%p protocols='%s' version='%s' host='%s' connection='%s' upgrade='%s' key='%s'"
+qio_channel_websock_header_partial_decode(void *ioc, size_t payloadlen, unsigned char fin, unsigned char opcode, unsigned char has_mask) "Websocket header decoded ioc=%p payload-len=%zu fin=0x%x opcode=0x%x has_mask=0x%x"
+qio_channel_websock_header_full_decode(void *ioc, size_t headerlen, size_t payloadlen, uint32_t mask) "Websocket header decoded ioc=%p header-len=%zu payload-len=%zu mask=0x%x"
+qio_channel_websock_payload_decode(void *ioc, uint8_t opcode, size_t payload_remain) "Websocket header decoded ioc=%p opcode=0x%x payload-remain=%zu"
+qio_channel_websock_encode(void *ioc, uint8_t opcode, size_t payloadlen, size_t headerlen) "Websocket encoded ioc=%p opcode=0x%x header-len=%zu payload-len=%zu"
+qio_channel_websock_close(void *ioc) "Websocket close ioc=%p"
+
+# channel-command.c
+qio_channel_command_new_pid(void *ioc, int writefd, int readfd, int pid) "Command new pid ioc=%p writefd=%d readfd=%d pid=%d"
+qio_channel_command_new_spawn(void *ioc, const char *binary, int flags) "Command new spawn ioc=%p binary=%s flags=%d"
+qio_channel_command_abort(void *ioc, int pid) "Command abort ioc=%p pid=%d"
+qio_channel_command_wait(void *ioc, int pid, int ret, int status) "Command abort ioc=%p pid=%d ret=%d status=%d"
diff --git a/io/trace.h b/io/trace.h
new file mode 100644
index 000000000..92d63a5bf
--- /dev/null
+++ b/io/trace.h
@@ -0,0 +1 @@
+#include "trace/trace-io.h"