aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
authorTimos Ampelikiotis <t.ampelikiotis@virtualopensystems.com>2023-10-10 11:40:56 +0000
committerTimos Ampelikiotis <t.ampelikiotis@virtualopensystems.com>2023-10-10 11:40:56 +0000
commite02cda008591317b1625707ff8e115a4841aa889 (patch)
treeaee302e3cf8b59ec2d32ec481be3d1afddfc8968 /python
parentcc668e6b7e0ffd8c9d130513d12053cf5eda1d3b (diff)
Introduce Virtio-loopback epsilon release:
Epsilon release introduces a new compatibility layer which make virtio-loopback design to work with QEMU and rust-vmm vhost-user backend without require any changes. Signed-off-by: Timos Ampelikiotis <t.ampelikiotis@virtualopensystems.com> Change-Id: I52e57563e08a7d0bdc002f8e928ee61ba0c53dd9
Diffstat (limited to 'python')
-rw-r--r--python/.gitignore22
-rw-r--r--python/MANIFEST.in3
-rw-r--r--python/Makefile111
-rw-r--r--python/PACKAGE.rst43
-rw-r--r--python/Pipfile13
-rw-r--r--python/Pipfile.lock335
-rw-r--r--python/README.rst87
-rw-r--r--python/VERSION1
-rw-r--r--python/avocado.cfg13
-rw-r--r--python/qemu/README.rst8
-rw-r--r--python/qemu/aqmp/__init__.py51
-rw-r--r--python/qemu/aqmp/aqmp_tui.py652
-rw-r--r--python/qemu/aqmp/error.py50
-rw-r--r--python/qemu/aqmp/events.py717
-rw-r--r--python/qemu/aqmp/legacy.py138
-rw-r--r--python/qemu/aqmp/message.py209
-rw-r--r--python/qemu/aqmp/models.py146
-rw-r--r--python/qemu/aqmp/protocol.py917
-rw-r--r--python/qemu/aqmp/py.typed0
-rw-r--r--python/qemu/aqmp/qmp_client.py651
-rw-r--r--python/qemu/aqmp/util.py217
-rw-r--r--python/qemu/machine/README.rst9
-rw-r--r--python/qemu/machine/__init__.py36
-rw-r--r--python/qemu/machine/console_socket.py129
-rw-r--r--python/qemu/machine/machine.py837
-rw-r--r--python/qemu/machine/py.typed0
-rw-r--r--python/qemu/machine/qtest.py163
-rw-r--r--python/qemu/qmp/README.rst9
-rw-r--r--python/qemu/qmp/__init__.py422
-rw-r--r--python/qemu/qmp/py.typed0
-rw-r--r--python/qemu/qmp/qemu_ga_client.py323
-rw-r--r--python/qemu/qmp/qmp_shell.py534
-rw-r--r--python/qemu/qmp/qom.py272
-rw-r--r--python/qemu/qmp/qom_common.py178
-rw-r--r--python/qemu/qmp/qom_fuse.py206
-rw-r--r--python/qemu/utils/README.rst7
-rw-r--r--python/qemu/utils/__init__.py45
-rw-r--r--python/qemu/utils/accel.py84
-rw-r--r--python/qemu/utils/py.typed0
-rw-r--r--python/setup.cfg177
-rwxr-xr-xpython/setup.py23
-rwxr-xr-xpython/tests/flake8.sh2
-rwxr-xr-xpython/tests/iotests-mypy.sh4
-rwxr-xr-xpython/tests/iotests-pylint.sh4
-rwxr-xr-xpython/tests/isort.sh2
-rwxr-xr-xpython/tests/mypy.sh2
-rw-r--r--python/tests/protocol.py583
-rwxr-xr-xpython/tests/pylint.sh2
48 files changed, 8437 insertions, 0 deletions
diff --git a/python/.gitignore b/python/.gitignore
new file mode 100644
index 000000000..904f324bb
--- /dev/null
+++ b/python/.gitignore
@@ -0,0 +1,22 @@
+# linter/tooling cache
+.mypy_cache/
+.cache/
+
+# python packaging
+build/
+dist/
+qemu.egg-info/
+
+# editor config
+.idea/
+.vscode/
+
+# virtual environments (pipenv et al)
+.venv/
+.tox/
+.dev-venv/
+
+# Coverage.py reports
+.coverage
+.coverage.*
+htmlcov/
diff --git a/python/MANIFEST.in b/python/MANIFEST.in
new file mode 100644
index 000000000..7059ad282
--- /dev/null
+++ b/python/MANIFEST.in
@@ -0,0 +1,3 @@
+include VERSION
+include PACKAGE.rst
+exclude README.rst
diff --git a/python/Makefile b/python/Makefile
new file mode 100644
index 000000000..333431136
--- /dev/null
+++ b/python/Makefile
@@ -0,0 +1,111 @@
+QEMU_VENV_DIR=.dev-venv
+QEMU_TOX_EXTRA_ARGS ?=
+
+.PHONY: help
+help:
+ @echo "python packaging help:"
+ @echo ""
+ @echo "make check-pipenv:"
+ @echo " Run tests in pipenv's virtual environment."
+ @echo " These tests use the oldest dependencies."
+ @echo " Requires: Python 3.6 and pipenv."
+ @echo " Hint (Fedora): 'sudo dnf install python3.6 pipenv'"
+ @echo ""
+ @echo "make check-tox:"
+ @echo " Run tests against multiple python versions."
+ @echo " These tests use the newest dependencies."
+ @echo " Requires: Python 3.6 - 3.10, and tox."
+ @echo " Hint (Fedora): 'sudo dnf install python3-tox python3.10'"
+ @echo " The variable QEMU_TOX_EXTRA_ARGS can be use to pass extra"
+ @echo " arguments to tox".
+ @echo ""
+ @echo "make check-dev:"
+ @echo " Run tests in a venv against your default python3 version."
+ @echo " These tests use the newest dependencies."
+ @echo " Requires: Python 3.x"
+ @echo ""
+ @echo "make check:"
+ @echo " Run tests in your *current environment*."
+ @echo " Performs no environment setup of any kind."
+ @echo ""
+ @echo "make develop:"
+ @echo " Install deps needed for for 'make check',"
+ @echo " and install the qemu package in editable mode."
+ @echo " (Can be used in or outside of a venv.)"
+ @echo ""
+ @echo "make pipenv"
+ @echo " Creates pipenv's virtual environment (.venv)"
+ @echo ""
+ @echo "make dev-venv"
+ @echo " Creates a simple venv for check-dev. ($(QEMU_VENV_DIR))"
+ @echo ""
+ @echo "make clean:"
+ @echo " Remove package build output."
+ @echo ""
+ @echo "make distclean:"
+ @echo " remove pipenv/venv files, qemu package forwarder,"
+ @echo " built distribution files, and everything from 'make clean'."
+ @echo ""
+ @echo -e "Have a nice day ^_^\n"
+
+.PHONY: pipenv
+pipenv: .venv
+.venv: Pipfile.lock
+ @PIPENV_VENV_IN_PROJECT=1 pipenv sync --dev --keep-outdated
+ rm -f pyproject.toml
+ @touch .venv
+
+.PHONY: check-pipenv
+check-pipenv: pipenv
+ @pipenv run make check
+
+.PHONY: dev-venv
+dev-venv: $(QEMU_VENV_DIR) $(QEMU_VENV_DIR)/bin/activate
+$(QEMU_VENV_DIR) $(QEMU_VENV_DIR)/bin/activate: setup.cfg
+ @echo "VENV $(QEMU_VENV_DIR)"
+ @python3 -m venv $(QEMU_VENV_DIR)
+ @( \
+ echo "ACTIVATE $(QEMU_VENV_DIR)"; \
+ . $(QEMU_VENV_DIR)/bin/activate; \
+ echo "INSTALL qemu[devel] $(QEMU_VENV_DIR)"; \
+ make develop 1>/dev/null; \
+ )
+ @touch $(QEMU_VENV_DIR)
+
+.PHONY: check-dev
+check-dev: dev-venv
+ @( \
+ echo "ACTIVATE $(QEMU_VENV_DIR)"; \
+ . $(QEMU_VENV_DIR)/bin/activate; \
+ make check; \
+ )
+
+.PHONY: develop
+develop:
+ pip3 install --disable-pip-version-check -e .[devel]
+
+.PHONY: check
+check:
+ @avocado --config avocado.cfg run tests/
+
+.PHONY: check-tox
+check-tox:
+ @tox $(QEMU_TOX_EXTRA_ARGS)
+
+.PHONY: check-coverage
+check-coverage:
+ @coverage run -m avocado --config avocado.cfg run tests/*.py
+ @coverage combine
+ @coverage html
+ @coverage report
+
+.PHONY: clean
+clean:
+ python3 setup.py clean --all
+ rm -f pyproject.toml
+
+.PHONY: distclean
+distclean: clean
+ rm -rf qemu.egg-info/ .venv/ .tox/ $(QEMU_VENV_DIR) dist/
+ rm -f .coverage .coverage.*
+ rm -rf htmlcov/
diff --git a/python/PACKAGE.rst b/python/PACKAGE.rst
new file mode 100644
index 000000000..b0b86cc4c
--- /dev/null
+++ b/python/PACKAGE.rst
@@ -0,0 +1,43 @@
+QEMU Python Tooling
+===================
+
+This package provides QEMU tooling used by the QEMU project to build,
+configure, and test QEMU. It is not a fully-fledged SDK and it is subject
+to change at any time.
+
+Usage
+-----
+
+The ``qemu.qmp`` subpackage provides a library for communicating with
+QMP servers. The ``qemu.machine`` subpackage offers rudimentary
+facilities for launching and managing QEMU processes. Refer to each
+package's documentation
+(``>>> help(qemu.qmp)``, ``>>> help(qemu.machine)``)
+for more information.
+
+Contributing
+------------
+
+This package is maintained by John Snow <jsnow@redhat.com> as part of
+the QEMU source tree. Contributions are welcome and follow the `QEMU
+patch submission process
+<https://wiki.qemu.org/Contribute/SubmitAPatch>`_, which involves
+sending patches to the QEMU development mailing list.
+
+John maintains a `GitLab staging branch
+<https://gitlab.com/jsnow/qemu/-/tree/python>`_, and there is an
+official `GitLab mirror <https://gitlab.com/qemu-project/qemu>`_.
+
+Please report bugs on the `QEMU issue tracker
+<https://gitlab.com/qemu-project/qemu/-/issues>`_ and tag ``@jsnow`` in
+the report.
+
+Optional packages necessary for running code quality analysis for this
+package can be installed with the optional dependency group "devel":
+``pip install qemu[devel]``.
+
+``make develop`` can be used to install this package in editable mode
+(to the current environment) *and* bring in testing dependencies in one
+command.
+
+``make check`` can be used to run the available tests.
diff --git a/python/Pipfile b/python/Pipfile
new file mode 100644
index 000000000..e7acb8cef
--- /dev/null
+++ b/python/Pipfile
@@ -0,0 +1,13 @@
+[[source]]
+name = "pypi"
+url = "https://pypi.org/simple"
+verify_ssl = true
+
+[dev-packages]
+qemu = {editable = true, extras = ["devel"], path = "."}
+
+[packages]
+qemu = {editable = true,path = "."}
+
+[requires]
+python_version = "3.6"
diff --git a/python/Pipfile.lock b/python/Pipfile.lock
new file mode 100644
index 000000000..d2a7dbd88
--- /dev/null
+++ b/python/Pipfile.lock
@@ -0,0 +1,335 @@
+{
+ "_meta": {
+ "hash": {
+ "sha256": "784b327272db32403d5a488507853b5afba850ba26a5948e5b6a90c1baef2d9c"
+ },
+ "pipfile-spec": 6,
+ "requires": {
+ "python_version": "3.6"
+ },
+ "sources": [
+ {
+ "name": "pypi",
+ "url": "https://pypi.org/simple",
+ "verify_ssl": true
+ }
+ ]
+ },
+ "default": {
+ "qemu": {
+ "editable": true,
+ "path": "."
+ }
+ },
+ "develop": {
+ "appdirs": {
+ "hashes": [
+ "sha256:7d5d0167b2b1ba821647616af46a749d1c653740dd0d2415100fe26e27afdf41",
+ "sha256:a841dacd6b99318a741b166adb07e19ee71a274450e68237b4650ca1055ab128"
+ ],
+ "version": "==1.4.4"
+ },
+ "astroid": {
+ "hashes": [
+ "sha256:09bdb456e02564731f8b5957cdd0c98a7f01d2db5e90eb1d794c353c28bfd705",
+ "sha256:6a8a51f64dae307f6e0c9db752b66a7951e282389d8362cc1d39a56f3feeb31d"
+ ],
+ "markers": "python_version ~= '3.6'",
+ "version": "==2.6.0"
+ },
+ "avocado-framework": {
+ "hashes": [
+ "sha256:244cb569f8eb4e50a22ac82e1a2b2bba2458999f4281efbe2651bd415d59c65b",
+ "sha256:6f15998b67ecd0e7dde790c4de4dd249d6df52dfe6d5cc4e2dd6596df51c3583"
+ ],
+ "index": "pypi",
+ "version": "==90.0"
+ },
+ "distlib": {
+ "hashes": [
+ "sha256:106fef6dc37dd8c0e2c0a60d3fca3e77460a48907f335fa28420463a6f799736",
+ "sha256:23e223426b28491b1ced97dc3bbe183027419dfc7982b4fa2f05d5f3ff10711c"
+ ],
+ "version": "==0.3.2"
+ },
+ "filelock": {
+ "hashes": [
+ "sha256:18d82244ee114f543149c66a6e0c14e9c4f8a1044b5cdaadd0f82159d6a6ff59",
+ "sha256:929b7d63ec5b7d6b71b0fa5ac14e030b3f70b75747cef1b10da9b879fef15836"
+ ],
+ "version": "==3.0.12"
+ },
+ "flake8": {
+ "hashes": [
+ "sha256:6a35f5b8761f45c5513e3405f110a86bea57982c3b75b766ce7b65217abe1670",
+ "sha256:c01f8a3963b3571a8e6bd7a4063359aff90749e160778e03817cd9b71c9e07d2"
+ ],
+ "index": "pypi",
+ "version": "==3.6.0"
+ },
+ "fusepy": {
+ "hashes": [
+ "sha256:10f5c7f5414241bffecdc333c4d3a725f1d6605cae6b4eaf86a838ff49cdaf6c",
+ "sha256:a9f3a3699080ddcf0919fd1eb2cf743e1f5859ca54c2018632f939bdfac269ee"
+ ],
+ "index": "pypi",
+ "version": "==2.0.4"
+ },
+ "importlib-metadata": {
+ "hashes": [
+ "sha256:90bb658cdbbf6d1735b6341ce708fc7024a3e14e99ffdc5783edea9f9b077f83",
+ "sha256:dc15b2969b4ce36305c51eebe62d418ac7791e9a157911d58bfb1f9ccd8e2070"
+ ],
+ "markers": "python_version < '3.8'",
+ "version": "==1.7.0"
+ },
+ "importlib-resources": {
+ "hashes": [
+ "sha256:54161657e8ffc76596c4ede7080ca68cb02962a2e074a2586b695a93a925d36e",
+ "sha256:e962bff7440364183203d179d7ae9ad90cb1f2b74dcb84300e88ecc42dca3351"
+ ],
+ "markers": "python_version < '3.7'",
+ "version": "==5.1.4"
+ },
+ "isort": {
+ "hashes": [
+ "sha256:408e4d75d84f51b64d0824894afee44469eba34a4caee621dc53799f80d71ccc",
+ "sha256:64022dea6a06badfa09b300b4dfe8ba968114a737919e8ed50aea1c288f078aa"
+ ],
+ "index": "pypi",
+ "version": "==5.1.2"
+ },
+ "lazy-object-proxy": {
+ "hashes": [
+ "sha256:17e0967ba374fc24141738c69736da90e94419338fd4c7c7bef01ee26b339653",
+ "sha256:1fee665d2638491f4d6e55bd483e15ef21f6c8c2095f235fef72601021e64f61",
+ "sha256:22ddd618cefe54305df49e4c069fa65715be4ad0e78e8d252a33debf00f6ede2",
+ "sha256:24a5045889cc2729033b3e604d496c2b6f588c754f7a62027ad4437a7ecc4837",
+ "sha256:410283732af311b51b837894fa2f24f2c0039aa7f220135192b38fcc42bd43d3",
+ "sha256:4732c765372bd78a2d6b2150a6e99d00a78ec963375f236979c0626b97ed8e43",
+ "sha256:489000d368377571c6f982fba6497f2aa13c6d1facc40660963da62f5c379726",
+ "sha256:4f60460e9f1eb632584c9685bccea152f4ac2130e299784dbaf9fae9f49891b3",
+ "sha256:5743a5ab42ae40caa8421b320ebf3a998f89c85cdc8376d6b2e00bd12bd1b587",
+ "sha256:85fb7608121fd5621cc4377a8961d0b32ccf84a7285b4f1d21988b2eae2868e8",
+ "sha256:9698110e36e2df951c7c36b6729e96429c9c32b3331989ef19976592c5f3c77a",
+ "sha256:9d397bf41caad3f489e10774667310d73cb9c4258e9aed94b9ec734b34b495fd",
+ "sha256:b579f8acbf2bdd9ea200b1d5dea36abd93cabf56cf626ab9c744a432e15c815f",
+ "sha256:b865b01a2e7f96db0c5d12cfea590f98d8c5ba64ad222300d93ce6ff9138bcad",
+ "sha256:bf34e368e8dd976423396555078def5cfc3039ebc6fc06d1ae2c5a65eebbcde4",
+ "sha256:c6938967f8528b3668622a9ed3b31d145fab161a32f5891ea7b84f6b790be05b",
+ "sha256:d1c2676e3d840852a2de7c7d5d76407c772927addff8d742b9808fe0afccebdf",
+ "sha256:d7124f52f3bd259f510651450e18e0fd081ed82f3c08541dffc7b94b883aa981",
+ "sha256:d900d949b707778696fdf01036f58c9876a0d8bfe116e8d220cfd4b15f14e741",
+ "sha256:ebfd274dcd5133e0afae738e6d9da4323c3eb021b3e13052d8cbd0e457b1256e",
+ "sha256:ed361bb83436f117f9917d282a456f9e5009ea12fd6de8742d1a4752c3017e93",
+ "sha256:f5144c75445ae3ca2057faac03fda5a902eff196702b0a24daf1d6ce0650514b"
+ ],
+ "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5'",
+ "version": "==1.6.0"
+ },
+ "mccabe": {
+ "hashes": [
+ "sha256:ab8a6258860da4b6677da4bd2fe5dc2c659cff31b3ee4f7f5d64e79735b80d42",
+ "sha256:dd8d182285a0fe56bace7f45b5e7d1a6ebcbf524e8f3bd87eb0f125271b8831f"
+ ],
+ "version": "==0.6.1"
+ },
+ "mypy": {
+ "hashes": [
+ "sha256:15b948e1302682e3682f11f50208b726a246ab4e6c1b39f9264a8796bb416aa2",
+ "sha256:219a3116ecd015f8dca7b5d2c366c973509dfb9a8fc97ef044a36e3da66144a1",
+ "sha256:3b1fc683fb204c6b4403a1ef23f0b1fac8e4477091585e0c8c54cbdf7d7bb164",
+ "sha256:3beff56b453b6ef94ecb2996bea101a08f1f8a9771d3cbf4988a61e4d9973761",
+ "sha256:7687f6455ec3ed7649d1ae574136835a4272b65b3ddcf01ab8704ac65616c5ce",
+ "sha256:7ec45a70d40ede1ec7ad7f95b3c94c9cf4c186a32f6bacb1795b60abd2f9ef27",
+ "sha256:86c857510a9b7c3104cf4cde1568f4921762c8f9842e987bc03ed4f160925754",
+ "sha256:8a627507ef9b307b46a1fea9513d5c98680ba09591253082b4c48697ba05a4ae",
+ "sha256:8dfb69fbf9f3aeed18afffb15e319ca7f8da9642336348ddd6cab2713ddcf8f9",
+ "sha256:a34b577cdf6313bf24755f7a0e3f3c326d5c1f4fe7422d1d06498eb25ad0c600",
+ "sha256:a8ffcd53cb5dfc131850851cc09f1c44689c2812d0beb954d8138d4f5fc17f65",
+ "sha256:b90928f2d9eb2f33162405f32dde9f6dcead63a0971ca8a1b50eb4ca3e35ceb8",
+ "sha256:c56ffe22faa2e51054c5f7a3bc70a370939c2ed4de308c690e7949230c995913",
+ "sha256:f91c7ae919bbc3f96cd5e5b2e786b2b108343d1d7972ea130f7de27fdd547cf3"
+ ],
+ "index": "pypi",
+ "version": "==0.770"
+ },
+ "mypy-extensions": {
+ "hashes": [
+ "sha256:090fedd75945a69ae91ce1303b5824f428daf5a028d2f6ab8a299250a846f15d",
+ "sha256:2d82818f5bb3e369420cb3c4060a7970edba416647068eb4c5343488a6c604a8"
+ ],
+ "version": "==0.4.3"
+ },
+ "packaging": {
+ "hashes": [
+ "sha256:5b327ac1320dc863dca72f4514ecc086f31186744b84a230374cc1fd776feae5",
+ "sha256:67714da7f7bc052e064859c05c595155bd1ee9f69f76557e21f051443c20947a"
+ ],
+ "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
+ "version": "==20.9"
+ },
+ "pluggy": {
+ "hashes": [
+ "sha256:15b2acde666561e1298d71b523007ed7364de07029219b604cf808bfa1c765b0",
+ "sha256:966c145cd83c96502c3c3868f50408687b38434af77734af1e9ca461a4081d2d"
+ ],
+ "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
+ "version": "==0.13.1"
+ },
+ "py": {
+ "hashes": [
+ "sha256:21b81bda15b66ef5e1a777a21c4dcd9c20ad3efd0b3f817e7a809035269e1bd3",
+ "sha256:3b80836aa6d1feeaa108e046da6423ab8f6ceda6468545ae8d02d9d58d18818a"
+ ],
+ "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
+ "version": "==1.10.0"
+ },
+ "pycodestyle": {
+ "hashes": [
+ "sha256:74abc4e221d393ea5ce1f129ea6903209940c1ecd29e002e8c6933c2b21026e0",
+ "sha256:cbc619d09254895b0d12c2c691e237b2e91e9b2ecf5e84c26b35400f93dcfb83",
+ "sha256:cbfca99bd594a10f674d0cd97a3d802a1fdef635d4361e1a2658de47ed261e3a"
+ ],
+ "version": "==2.4.0"
+ },
+ "pyflakes": {
+ "hashes": [
+ "sha256:9a7662ec724d0120012f6e29d6248ae3727d821bba522a0e6b356eff19126a49",
+ "sha256:f661252913bc1dbe7fcfcbf0af0db3f42ab65aabd1a6ca68fe5d466bace94dae"
+ ],
+ "version": "==2.0.0"
+ },
+ "pygments": {
+ "hashes": [
+ "sha256:a18f47b506a429f6f4b9df81bb02beab9ca21d0a5fee38ed15aef65f0545519f",
+ "sha256:d66e804411278594d764fc69ec36ec13d9ae9147193a1740cd34d272ca383b8e"
+ ],
+ "markers": "python_version >= '3.5'",
+ "version": "==2.9.0"
+ },
+ "pylint": {
+ "hashes": [
+ "sha256:082a6d461b54f90eea49ca90fff4ee8b6e45e8029e5dbd72f6107ef84f3779c0",
+ "sha256:a01cd675eccf6e25b3bdb42be184eb46aaf89187d612ba0fb5f93328ed6b0fd5"
+ ],
+ "index": "pypi",
+ "version": "==2.8.0"
+ },
+ "pyparsing": {
+ "hashes": [
+ "sha256:c203ec8783bf771a155b207279b9bccb8dea02d8f0c9e5f8ead507bc3246ecc1",
+ "sha256:ef9d7589ef3c200abe66653d3f1ab1033c3c419ae9b9bdb1240a85b024efc88b"
+ ],
+ "markers": "python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3'",
+ "version": "==2.4.7"
+ },
+ "qemu": {
+ "editable": true,
+ "path": "."
+ },
+ "six": {
+ "hashes": [
+ "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926",
+ "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"
+ ],
+ "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
+ "version": "==1.16.0"
+ },
+ "toml": {
+ "hashes": [
+ "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b",
+ "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"
+ ],
+ "markers": "python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3'",
+ "version": "==0.10.2"
+ },
+ "tox": {
+ "hashes": [
+ "sha256:c60692d92fe759f46c610ac04c03cf0169432d1ff8e981e8ae63e068d0954fc3",
+ "sha256:f179cb4043d7dc1339425dd49ab1dd8c916246b0d9173143c1b0af7498a03ab0"
+ ],
+ "index": "pypi",
+ "version": "==3.18.0"
+ },
+ "typed-ast": {
+ "hashes": [
+ "sha256:01ae5f73431d21eead5015997ab41afa53aa1fbe252f9da060be5dad2c730ace",
+ "sha256:067a74454df670dcaa4e59349a2e5c81e567d8d65458d480a5b3dfecec08c5ff",
+ "sha256:0fb71b8c643187d7492c1f8352f2c15b4c4af3f6338f21681d3681b3dc31a266",
+ "sha256:1b3ead4a96c9101bef08f9f7d1217c096f31667617b58de957f690c92378b528",
+ "sha256:2068531575a125b87a41802130fa7e29f26c09a2833fea68d9a40cf33902eba6",
+ "sha256:209596a4ec71d990d71d5e0d312ac935d86930e6eecff6ccc7007fe54d703808",
+ "sha256:2c726c276d09fc5c414693a2de063f521052d9ea7c240ce553316f70656c84d4",
+ "sha256:398e44cd480f4d2b7ee8d98385ca104e35c81525dd98c519acff1b79bdaac363",
+ "sha256:52b1eb8c83f178ab787f3a4283f68258525f8d70f778a2f6dd54d3b5e5fb4341",
+ "sha256:5feca99c17af94057417d744607b82dd0a664fd5e4ca98061480fd8b14b18d04",
+ "sha256:7538e495704e2ccda9b234b82423a4038f324f3a10c43bc088a1636180f11a41",
+ "sha256:760ad187b1041a154f0e4d0f6aae3e40fdb51d6de16e5c99aedadd9246450e9e",
+ "sha256:777a26c84bea6cd934422ac2e3b78863a37017618b6e5c08f92ef69853e765d3",
+ "sha256:95431a26309a21874005845c21118c83991c63ea800dd44843e42a916aec5899",
+ "sha256:9ad2c92ec681e02baf81fdfa056fe0d818645efa9af1f1cd5fd6f1bd2bdfd805",
+ "sha256:9c6d1a54552b5330bc657b7ef0eae25d00ba7ffe85d9ea8ae6540d2197a3788c",
+ "sha256:aee0c1256be6c07bd3e1263ff920c325b59849dc95392a05f258bb9b259cf39c",
+ "sha256:af3d4a73793725138d6b334d9d247ce7e5f084d96284ed23f22ee626a7b88e39",
+ "sha256:b36b4f3920103a25e1d5d024d155c504080959582b928e91cb608a65c3a49e1a",
+ "sha256:b9574c6f03f685070d859e75c7f9eeca02d6933273b5e69572e5ff9d5e3931c3",
+ "sha256:bff6ad71c81b3bba8fa35f0f1921fb24ff4476235a6e94a26ada2e54370e6da7",
+ "sha256:c190f0899e9f9f8b6b7863debfb739abcb21a5c054f911ca3596d12b8a4c4c7f",
+ "sha256:c907f561b1e83e93fad565bac5ba9c22d96a54e7ea0267c708bffe863cbe4075",
+ "sha256:cae53c389825d3b46fb37538441f75d6aecc4174f615d048321b716df2757fb0",
+ "sha256:dd4a21253f42b8d2b48410cb31fe501d32f8b9fbeb1f55063ad102fe9c425e40",
+ "sha256:dde816ca9dac1d9c01dd504ea5967821606f02e510438120091b84e852367428",
+ "sha256:f2362f3cb0f3172c42938946dbc5b7843c2a28aec307c49100c8b38764eb6927",
+ "sha256:f328adcfebed9f11301eaedfa48e15bdece9b519fb27e6a8c01aa52a17ec31b3",
+ "sha256:f8afcf15cc511ada719a88e013cec87c11aff7b91f019295eb4530f96fe5ef2f",
+ "sha256:fb1bbeac803adea29cedd70781399c99138358c26d05fcbd23c13016b7f5ec65"
+ ],
+ "markers": "python_version < '3.8' and implementation_name == 'cpython'",
+ "version": "==1.4.3"
+ },
+ "typing-extensions": {
+ "hashes": [
+ "sha256:0ac0f89795dd19de6b97debb0c6af1c70987fd80a2d62d1958f7e56fcc31b497",
+ "sha256:50b6f157849174217d0656f99dc82fe932884fb250826c18350e159ec6cdf342",
+ "sha256:779383f6086d90c99ae41cf0ff39aac8a7937a9283ce0a414e5dd782f4c94a84"
+ ],
+ "markers": "python_version < '3.8'",
+ "version": "==3.10.0.0"
+ },
+ "urwid": {
+ "hashes": [
+ "sha256:588bee9c1cb208d0906a9f73c613d2bd32c3ed3702012f51efe318a3f2127eae"
+ ],
+ "version": "==2.1.2"
+ },
+ "urwid-readline": {
+ "hashes": [
+ "sha256:018020cbc864bb5ed87be17dc26b069eae2755cb29f3a9c569aac3bded1efaf4"
+ ],
+ "version": "==0.13"
+ },
+ "virtualenv": {
+ "hashes": [
+ "sha256:14fdf849f80dbb29a4eb6caa9875d476ee2a5cf76a5f5415fa2f1606010ab467",
+ "sha256:2b0126166ea7c9c3661f5b8e06773d28f83322de7a3ff7d06f0aed18c9de6a76"
+ ],
+ "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
+ "version": "==20.4.7"
+ },
+ "wrapt": {
+ "hashes": [
+ "sha256:b62ffa81fb85f4332a4f609cab4ac40709470da05643a082ec1eb88e6d9b97d7"
+ ],
+ "version": "==1.12.1"
+ },
+ "zipp": {
+ "hashes": [
+ "sha256:3607921face881ba3e026887d8150cca609d517579abe052ac81fc5aeffdbd76",
+ "sha256:51cb66cc54621609dd593d1787f286ee42a5c0adbb4b29abea5a63edc3e03098"
+ ],
+ "markers": "python_version < '3.10'",
+ "version": "==3.4.1"
+ }
+ }
+}
diff --git a/python/README.rst b/python/README.rst
new file mode 100644
index 000000000..9c1fceaee
--- /dev/null
+++ b/python/README.rst
@@ -0,0 +1,87 @@
+QEMU Python Tooling
+===================
+
+This directory houses Python tooling used by the QEMU project to build,
+configure, and test QEMU. It is organized by namespace (``qemu``), and
+then by package (e.g. ``qemu/machine``, ``qemu/qmp``, etc).
+
+``setup.py`` is used by ``pip`` to install this tooling to the current
+environment. ``setup.cfg`` provides the packaging configuration used by
+``setup.py``. You will generally invoke it by doing one of the following:
+
+1. ``pip3 install .`` will install these packages to your current
+ environment. If you are inside a virtual environment, they will
+ install there. If you are not, it will attempt to install to the
+ global environment, which is **not recommended**.
+
+2. ``pip3 install --user .`` will install these packages to your user's
+ local python packages. If you are inside of a virtual environment,
+ this will fail; you want the first invocation above.
+
+If you append the ``--editable`` or ``-e`` argument to either invocation
+above, pip will install in "editable" mode. This installs the package as
+a forwarder ("qemu.egg-link") that points to the source tree. In so
+doing, the installed package always reflects the latest version in your
+source tree.
+
+Installing ".[devel]" instead of "." will additionally pull in required
+packages for testing this package. They are not runtime requirements,
+and are not needed to simply use these libraries.
+
+Running ``make develop`` will pull in all testing dependencies and
+install QEMU in editable mode to the current environment.
+(It is a shortcut for ``pip3 install -e .[devel]``.)
+
+See `Installing packages using pip and virtual environments
+<https://packaging.python.org/guides/installing-using-pip-and-virtual-environments/>`_
+for more information.
+
+
+Using these packages without installing them
+--------------------------------------------
+
+These packages may be used without installing them first, by using one
+of two tricks:
+
+1. Set your PYTHONPATH environment variable to include this source
+ directory, e.g. ``~/src/qemu/python``. See
+ https://docs.python.org/3/using/cmdline.html#envvar-PYTHONPATH
+
+2. Inside a Python script, use ``sys.path`` to forcibly include a search
+ path prior to importing the ``qemu`` namespace. See
+ https://docs.python.org/3/library/sys.html#sys.path
+
+A strong downside to both approaches is that they generally interfere
+with static analysis tools being able to locate and analyze the code
+being imported.
+
+Package installation also normally provides executable console scripts,
+so that tools like ``qmp-shell`` are always available via $PATH. To
+invoke them without installation, you can invoke e.g.:
+
+``> PYTHONPATH=~/src/qemu/python python3 -m qemu.qmp.qmp_shell``
+
+The mappings between console script name and python module path can be
+found in ``setup.cfg``.
+
+
+Files in this directory
+-----------------------
+
+- ``qemu/`` Python 'qemu' namespace package source directory.
+- ``tests/`` Python package tests directory.
+- ``avocado.cfg`` Configuration for the Avocado test-runner.
+ Used by ``make check`` et al.
+- ``Makefile`` provides some common testing/installation invocations.
+ Try ``make help`` to see available targets.
+- ``MANIFEST.in`` is read by python setuptools, it specifies additional files
+ that should be included by a source distribution.
+- ``PACKAGE.rst`` is used as the README file that is visible on PyPI.org.
+- ``Pipfile`` is used by Pipenv to generate ``Pipfile.lock``.
+- ``Pipfile.lock`` is a set of pinned package dependencies that this package
+ is tested under in our CI suite. It is used by ``make check-pipenv``.
+- ``README.rst`` you are here!
+- ``VERSION`` contains the PEP-440 compliant version used to describe
+ this package; it is referenced by ``setup.cfg``.
+- ``setup.cfg`` houses setuptools package configuration.
+- ``setup.py`` is the setuptools installer used by pip; See above.
diff --git a/python/VERSION b/python/VERSION
new file mode 100644
index 000000000..c19f3b832
--- /dev/null
+++ b/python/VERSION
@@ -0,0 +1 @@
+0.6.1.0a1
diff --git a/python/avocado.cfg b/python/avocado.cfg
new file mode 100644
index 000000000..c7722e7ec
--- /dev/null
+++ b/python/avocado.cfg
@@ -0,0 +1,13 @@
+[run]
+test_runner = runner
+
+[simpletests]
+# Don't show stdout/stderr in the test *summary*
+status.failure_fields = ['status']
+
+[job]
+# Don't show the full debug.log output; only select stdout/stderr.
+output.testlogs.logfiles = ['stdout', 'stderr']
+
+# Show full stdout/stderr only on tests that FAIL
+output.testlogs.statuses = ['FAIL']
diff --git a/python/qemu/README.rst b/python/qemu/README.rst
new file mode 100644
index 000000000..d04943f52
--- /dev/null
+++ b/python/qemu/README.rst
@@ -0,0 +1,8 @@
+QEMU Python Namespace
+=====================
+
+This directory serves as the root of a `Python PEP 420 implicit
+namespace package <https://www.python.org/dev/peps/pep-0420/>`_.
+
+Each directory below is assumed to be an installable Python package that
+is available under the ``qemu.<package>`` namespace.
diff --git a/python/qemu/aqmp/__init__.py b/python/qemu/aqmp/__init__.py
new file mode 100644
index 000000000..880d5b6fa
--- /dev/null
+++ b/python/qemu/aqmp/__init__.py
@@ -0,0 +1,51 @@
+"""
+QEMU Monitor Protocol (QMP) development library & tooling.
+
+This package provides a fairly low-level class for communicating
+asynchronously with QMP protocol servers, as implemented by QEMU, the
+QEMU Guest Agent, and the QEMU Storage Daemon.
+
+`QMPClient` provides the main functionality of this package. All errors
+raised by this library dervive from `AQMPError`, see `aqmp.error` for
+additional detail. See `aqmp.events` for an in-depth tutorial on
+managing QMP events.
+"""
+
+# Copyright (C) 2020, 2021 John Snow for Red Hat, Inc.
+#
+# Authors:
+# John Snow <jsnow@redhat.com>
+#
+# Based on earlier work by Luiz Capitulino <lcapitulino@redhat.com>.
+#
+# This work is licensed under the terms of the GNU GPL, version 2. See
+# the COPYING file in the top-level directory.
+
+import logging
+
+from .error import AQMPError
+from .events import EventListener
+from .message import Message
+from .protocol import ConnectError, Runstate, StateError
+from .qmp_client import ExecInterruptedError, ExecuteError, QMPClient
+
+
+# Suppress logging unless an application engages it.
+logging.getLogger('qemu.aqmp').addHandler(logging.NullHandler())
+
+
+# The order of these fields impact the Sphinx documentation order.
+__all__ = (
+ # Classes, most to least important
+ 'QMPClient',
+ 'Message',
+ 'EventListener',
+ 'Runstate',
+
+ # Exceptions, most generic to most explicit
+ 'AQMPError',
+ 'StateError',
+ 'ConnectError',
+ 'ExecuteError',
+ 'ExecInterruptedError',
+)
diff --git a/python/qemu/aqmp/aqmp_tui.py b/python/qemu/aqmp/aqmp_tui.py
new file mode 100644
index 000000000..a2929f771
--- /dev/null
+++ b/python/qemu/aqmp/aqmp_tui.py
@@ -0,0 +1,652 @@
+# Copyright (c) 2021
+#
+# Authors:
+# Niteesh Babu G S <niteesh.gs@gmail.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later. See the COPYING file in the top-level directory.
+"""
+AQMP TUI
+
+AQMP TUI is an asynchronous interface built on top the of the AQMP library.
+It is the successor of QMP-shell and is bought-in as a replacement for it.
+
+Example Usage: aqmp-tui <SOCKET | TCP IP:PORT>
+Full Usage: aqmp-tui --help
+"""
+
+import argparse
+import asyncio
+import json
+import logging
+from logging import Handler, LogRecord
+import signal
+from typing import (
+ List,
+ Optional,
+ Tuple,
+ Type,
+ Union,
+ cast,
+)
+
+from pygments import lexers
+from pygments import token as Token
+import urwid
+import urwid_readline
+
+from ..qmp import QEMUMonitorProtocol, QMPBadPortError
+from .error import ProtocolError
+from .message import DeserializationError, Message, UnexpectedTypeError
+from .protocol import ConnectError, Runstate
+from .qmp_client import ExecInterruptedError, QMPClient
+from .util import create_task, pretty_traceback
+
+
+# The name of the signal that is used to update the history list
+UPDATE_MSG: str = 'UPDATE_MSG'
+
+
+palette = [
+ (Token.Punctuation, '', '', '', 'h15,bold', 'g7'),
+ (Token.Text, '', '', '', '', 'g7'),
+ (Token.Name.Tag, '', '', '', 'bold,#f88', 'g7'),
+ (Token.Literal.Number.Integer, '', '', '', '#fa0', 'g7'),
+ (Token.Literal.String.Double, '', '', '', '#6f6', 'g7'),
+ (Token.Keyword.Constant, '', '', '', '#6af', 'g7'),
+ ('DEBUG', '', '', '', '#ddf', 'g7'),
+ ('INFO', '', '', '', 'g100', 'g7'),
+ ('WARNING', '', '', '', '#ff6', 'g7'),
+ ('ERROR', '', '', '', '#a00', 'g7'),
+ ('CRITICAL', '', '', '', '#a00', 'g7'),
+ ('background', '', 'black', '', '', 'g7'),
+]
+
+
+def format_json(msg: str) -> str:
+ """
+ Formats valid/invalid multi-line JSON message into a single-line message.
+
+ Formatting is first tried using the standard json module. If that fails
+ due to an decoding error then a simple string manipulation is done to
+ achieve a single line JSON string.
+
+ Converting into single line is more asthetically pleasing when looking
+ along with error messages.
+
+ Eg:
+ Input:
+ [ 1,
+ true,
+ 3 ]
+ The above input is not a valid QMP message and produces the following error
+ "QMP message is not a JSON object."
+ When displaying this in TUI in multiline mode we get
+
+ [ 1,
+ true,
+ 3 ]: QMP message is not a JSON object.
+
+ whereas in singleline mode we get the following
+
+ [1, true, 3]: QMP message is not a JSON object.
+
+ The single line mode is more asthetically pleasing.
+
+ :param msg:
+ The message to formatted into single line.
+
+ :return: Formatted singleline message.
+ """
+ try:
+ msg = json.loads(msg)
+ return str(json.dumps(msg))
+ except json.decoder.JSONDecodeError:
+ msg = msg.replace('\n', '')
+ words = msg.split(' ')
+ words = list(filter(None, words))
+ return ' '.join(words)
+
+
+def has_handler_type(logger: logging.Logger,
+ handler_type: Type[Handler]) -> bool:
+ """
+ The Logger class has no interface to check if a certain type of handler is
+ installed or not. So we provide an interface to do so.
+
+ :param logger:
+ Logger object
+ :param handler_type:
+ The type of the handler to be checked.
+
+ :return: returns True if handler of type `handler_type`.
+ """
+ for handler in logger.handlers:
+ if isinstance(handler, handler_type):
+ return True
+ return False
+
+
+class App(QMPClient):
+ """
+ Implements the AQMP TUI.
+
+ Initializes the widgets and starts the urwid event loop.
+
+ :param address:
+ Address of the server to connect to.
+ :param num_retries:
+ The number of times to retry before stopping to reconnect.
+ :param retry_delay:
+ The delay(sec) before each retry
+ """
+ def __init__(self, address: Union[str, Tuple[str, int]], num_retries: int,
+ retry_delay: Optional[int]) -> None:
+ urwid.register_signal(type(self), UPDATE_MSG)
+ self.window = Window(self)
+ self.address = address
+ self.aloop: Optional[asyncio.AbstractEventLoop] = None
+ self.num_retries = num_retries
+ self.retry_delay = retry_delay if retry_delay else 2
+ self.retry: bool = False
+ self.exiting: bool = False
+ super().__init__()
+
+ def add_to_history(self, msg: str, level: Optional[str] = None) -> None:
+ """
+ Appends the msg to the history list.
+
+ :param msg:
+ The raw message to be appended in string type.
+ """
+ urwid.emit_signal(self, UPDATE_MSG, msg, level)
+
+ def _cb_outbound(self, msg: Message) -> Message:
+ """
+ Callback: outbound message hook.
+
+ Appends the outgoing messages to the history box.
+
+ :param msg: raw outbound message.
+ :return: final outbound message.
+ """
+ str_msg = str(msg)
+
+ if not has_handler_type(logging.getLogger(), TUILogHandler):
+ logging.debug('Request: %s', str_msg)
+ self.add_to_history('<-- ' + str_msg)
+ return msg
+
+ def _cb_inbound(self, msg: Message) -> Message:
+ """
+ Callback: outbound message hook.
+
+ Appends the incoming messages to the history box.
+
+ :param msg: raw inbound message.
+ :return: final inbound message.
+ """
+ str_msg = str(msg)
+
+ if not has_handler_type(logging.getLogger(), TUILogHandler):
+ logging.debug('Request: %s', str_msg)
+ self.add_to_history('--> ' + str_msg)
+ return msg
+
+ async def _send_to_server(self, msg: Message) -> None:
+ """
+ This coroutine sends the message to the server.
+ The message has to be pre-validated.
+
+ :param msg:
+ Pre-validated message to be to sent to the server.
+
+ :raise Exception: When an unhandled exception is caught.
+ """
+ try:
+ await self._raw(msg, assign_id='id' not in msg)
+ except ExecInterruptedError as err:
+ logging.info('Error server disconnected before reply %s', str(err))
+ self.add_to_history('Server disconnected before reply', 'ERROR')
+ except Exception as err:
+ logging.error('Exception from _send_to_server: %s', str(err))
+ raise err
+
+ def cb_send_to_server(self, raw_msg: str) -> None:
+ """
+ Validates and sends the message to the server.
+ The raw string message is first converted into a Message object
+ and is then sent to the server.
+
+ :param raw_msg:
+ The raw string message to be sent to the server.
+
+ :raise Exception: When an unhandled exception is caught.
+ """
+ try:
+ msg = Message(bytes(raw_msg, encoding='utf-8'))
+ create_task(self._send_to_server(msg))
+ except (DeserializationError, UnexpectedTypeError) as err:
+ raw_msg = format_json(raw_msg)
+ logging.info('Invalid message: %s', err.error_message)
+ self.add_to_history(f'{raw_msg}: {err.error_message}', 'ERROR')
+
+ def unhandled_input(self, key: str) -> None:
+ """
+ Handle's keys which haven't been handled by the child widgets.
+
+ :param key:
+ Unhandled key
+ """
+ if key == 'esc':
+ self.kill_app()
+
+ def kill_app(self) -> None:
+ """
+ Initiates killing of app. A bridge between asynchronous and synchronous
+ code.
+ """
+ create_task(self._kill_app())
+
+ async def _kill_app(self) -> None:
+ """
+ This coroutine initiates the actual disconnect process and calls
+ urwid.ExitMainLoop() to kill the TUI.
+
+ :raise Exception: When an unhandled exception is caught.
+ """
+ self.exiting = True
+ await self.disconnect()
+ logging.debug('Disconnect finished. Exiting app')
+ raise urwid.ExitMainLoop()
+
+ async def disconnect(self) -> None:
+ """
+ Overrides the disconnect method to handle the errors locally.
+ """
+ try:
+ await super().disconnect()
+ except (OSError, EOFError) as err:
+ logging.info('disconnect: %s', str(err))
+ self.retry = True
+ except ProtocolError as err:
+ logging.info('disconnect: %s', str(err))
+ except Exception as err:
+ logging.error('disconnect: Unhandled exception %s', str(err))
+ raise err
+
+ def _set_status(self, msg: str) -> None:
+ """
+ Sets the message as the status.
+
+ :param msg:
+ The message to be displayed in the status bar.
+ """
+ self.window.footer.set_text(msg)
+
+ def _get_formatted_address(self) -> str:
+ """
+ Returns a formatted version of the server's address.
+
+ :return: formatted address
+ """
+ if isinstance(self.address, tuple):
+ host, port = self.address
+ addr = f'{host}:{port}'
+ else:
+ addr = f'{self.address}'
+ return addr
+
+ async def _initiate_connection(self) -> Optional[ConnectError]:
+ """
+ Tries connecting to a server a number of times with a delay between
+ each try. If all retries failed then return the error faced during
+ the last retry.
+
+ :return: Error faced during last retry.
+ """
+ current_retries = 0
+ err = None
+
+ # initial try
+ await self.connect_server()
+ while self.retry and current_retries < self.num_retries:
+ logging.info('Connection Failed, retrying in %d', self.retry_delay)
+ status = f'[Retry #{current_retries} ({self.retry_delay}s)]'
+ self._set_status(status)
+
+ await asyncio.sleep(self.retry_delay)
+
+ err = await self.connect_server()
+ current_retries += 1
+ # If all retries failed report the last error
+ if err:
+ logging.info('All retries failed: %s', err)
+ return err
+ return None
+
+ async def manage_connection(self) -> None:
+ """
+ Manage the connection based on the current run state.
+
+ A reconnect is issued when the current state is IDLE and the number
+ of retries is not exhausted.
+ A disconnect is issued when the current state is DISCONNECTING.
+ """
+ while not self.exiting:
+ if self.runstate == Runstate.IDLE:
+ err = await self._initiate_connection()
+ # If retry is still true then, we have exhausted all our tries.
+ if err:
+ self._set_status(f'[Error: {err.error_message}]')
+ else:
+ addr = self._get_formatted_address()
+ self._set_status(f'[Connected {addr}]')
+ elif self.runstate == Runstate.DISCONNECTING:
+ self._set_status('[Disconnected]')
+ await self.disconnect()
+ # check if a retry is needed
+ if self.runstate == Runstate.IDLE:
+ continue
+ await self.runstate_changed()
+
+ async def connect_server(self) -> Optional[ConnectError]:
+ """
+ Initiates a connection to the server at address `self.address`
+ and in case of a failure, sets the status to the respective error.
+ """
+ try:
+ await self.connect(self.address)
+ self.retry = False
+ except ConnectError as err:
+ logging.info('connect_server: ConnectError %s', str(err))
+ self.retry = True
+ return err
+ return None
+
+ def run(self, debug: bool = False) -> None:
+ """
+ Starts the long running co-routines and the urwid event loop.
+
+ :param debug:
+ Enables/Disables asyncio event loop debugging
+ """
+ screen = urwid.raw_display.Screen()
+ screen.set_terminal_properties(256)
+
+ self.aloop = asyncio.get_event_loop()
+ self.aloop.set_debug(debug)
+
+ # Gracefully handle SIGTERM and SIGINT signals
+ cancel_signals = [signal.SIGTERM, signal.SIGINT]
+ for sig in cancel_signals:
+ self.aloop.add_signal_handler(sig, self.kill_app)
+
+ event_loop = urwid.AsyncioEventLoop(loop=self.aloop)
+ main_loop = urwid.MainLoop(urwid.AttrMap(self.window, 'background'),
+ unhandled_input=self.unhandled_input,
+ screen=screen,
+ palette=palette,
+ handle_mouse=True,
+ event_loop=event_loop)
+
+ create_task(self.manage_connection(), self.aloop)
+ try:
+ main_loop.run()
+ except Exception as err:
+ logging.error('%s\n%s\n', str(err), pretty_traceback())
+ raise err
+
+
+class StatusBar(urwid.Text):
+ """
+ A simple statusbar modelled using the Text widget. The status can be
+ set using the set_text function. All text set is aligned to right.
+
+ :param text: Initial text to be displayed. Default is empty str.
+ """
+ def __init__(self, text: str = ''):
+ super().__init__(text, align='right')
+
+
+class Editor(urwid_readline.ReadlineEdit):
+ """
+ A simple editor modelled using the urwid_readline.ReadlineEdit widget.
+ Mimcs GNU readline shortcuts and provides history support.
+
+ The readline shortcuts can be found below:
+ https://github.com/rr-/urwid_readline#features
+
+ Along with the readline features, this editor also has support for
+ history. Pressing the 'up'/'down' switches between the prev/next messages
+ available in the history.
+
+ Currently there is no support to save the history to a file. The history of
+ previous commands is lost on exit.
+
+ :param parent: Reference to the TUI object.
+ """
+ def __init__(self, parent: App) -> None:
+ super().__init__(caption='> ', multiline=True)
+ self.parent = parent
+ self.history: List[str] = []
+ self.last_index: int = -1
+ self.show_history: bool = False
+
+ def keypress(self, size: Tuple[int, int], key: str) -> Optional[str]:
+ """
+ Handles the keypress on this widget.
+
+ :param size:
+ The current size of the widget.
+ :param key:
+ The key to be handled.
+
+ :return: Unhandled key if any.
+ """
+ msg = self.get_edit_text()
+ if key == 'up' and not msg:
+ # Show the history when 'up arrow' is pressed with no input text.
+ # NOTE: The show_history logic is necessary because in 'multiline'
+ # mode (which we use) 'up arrow' is used to move between lines.
+ if not self.history:
+ return None
+ self.show_history = True
+ last_msg = self.history[self.last_index]
+ self.set_edit_text(last_msg)
+ self.edit_pos = len(last_msg)
+ elif key == 'up' and self.show_history:
+ self.last_index = max(self.last_index - 1, -len(self.history))
+ self.set_edit_text(self.history[self.last_index])
+ self.edit_pos = len(self.history[self.last_index])
+ elif key == 'down' and self.show_history:
+ if self.last_index == -1:
+ self.set_edit_text('')
+ self.show_history = False
+ else:
+ self.last_index += 1
+ self.set_edit_text(self.history[self.last_index])
+ self.edit_pos = len(self.history[self.last_index])
+ elif key == 'meta enter':
+ # When using multiline, enter inserts a new line into the editor
+ # send the input to the server on alt + enter
+ self.parent.cb_send_to_server(msg)
+ self.history.append(msg)
+ self.set_edit_text('')
+ self.last_index = -1
+ self.show_history = False
+ else:
+ self.show_history = False
+ self.last_index = -1
+ return cast(Optional[str], super().keypress(size, key))
+ return None
+
+
+class EditorWidget(urwid.Filler):
+ """
+ Wrapper around the editor widget.
+
+ The Editor is a flow widget and has to wrapped inside a box widget.
+ This class wraps the Editor inside filler widget.
+
+ :param parent: Reference to the TUI object.
+ """
+ def __init__(self, parent: App) -> None:
+ super().__init__(Editor(parent), valign='top')
+
+
+class HistoryBox(urwid.ListBox):
+ """
+ This widget is modelled using the ListBox widget, contains the list of
+ all messages both QMP messages and log messsages to be shown in the TUI.
+
+ The messages are urwid.Text widgets. On every append of a message, the
+ focus is shifted to the last appended message.
+
+ :param parent: Reference to the TUI object.
+ """
+ def __init__(self, parent: App) -> None:
+ self.parent = parent
+ self.history = urwid.SimpleFocusListWalker([])
+ super().__init__(self.history)
+
+ def add_to_history(self,
+ history: Union[str, List[Tuple[str, str]]]) -> None:
+ """
+ Appends a message to the list and set the focus to the last appended
+ message.
+
+ :param history:
+ The history item(message/event) to be appended to the list.
+ """
+ self.history.append(urwid.Text(history))
+ self.history.set_focus(len(self.history) - 1)
+
+ def mouse_event(self, size: Tuple[int, int], _event: str, button: float,
+ _x: int, _y: int, focus: bool) -> None:
+ # Unfortunately there are no urwid constants that represent the mouse
+ # events.
+ if button == 4: # Scroll up event
+ super().keypress(size, 'up')
+ elif button == 5: # Scroll down event
+ super().keypress(size, 'down')
+
+
+class HistoryWindow(urwid.Frame):
+ """
+ This window composes the HistoryBox and EditorWidget in a horizontal split.
+ By default the first focus is given to the history box.
+
+ :param parent: Reference to the TUI object.
+ """
+ def __init__(self, parent: App) -> None:
+ self.parent = parent
+ self.editor_widget = EditorWidget(parent)
+ self.editor = urwid.LineBox(self.editor_widget)
+ self.history = HistoryBox(parent)
+ self.body = urwid.Pile([('weight', 80, self.history),
+ ('weight', 20, self.editor)])
+ super().__init__(self.body)
+ urwid.connect_signal(self.parent, UPDATE_MSG, self.cb_add_to_history)
+
+ def cb_add_to_history(self, msg: str, level: Optional[str] = None) -> None:
+ """
+ Appends a message to the history box
+
+ :param msg:
+ The message to be appended to the history box.
+ :param level:
+ The log level of the message, if it is a log message.
+ """
+ formatted = []
+ if level:
+ msg = f'[{level}]: {msg}'
+ formatted.append((level, msg))
+ else:
+ lexer = lexers.JsonLexer() # pylint: disable=no-member
+ for token in lexer.get_tokens(msg):
+ formatted.append(token)
+ self.history.add_to_history(formatted)
+
+
+class Window(urwid.Frame):
+ """
+ This window is the top most widget of the TUI and will contain other
+ windows. Each child of this widget is responsible for displaying a specific
+ functionality.
+
+ :param parent: Reference to the TUI object.
+ """
+ def __init__(self, parent: App) -> None:
+ self.parent = parent
+ footer = StatusBar()
+ body = HistoryWindow(parent)
+ super().__init__(body, footer=footer)
+
+
+class TUILogHandler(Handler):
+ """
+ This handler routes all the log messages to the TUI screen.
+ It is installed to the root logger to so that the log message from all
+ libraries begin used is routed to the screen.
+
+ :param tui: Reference to the TUI object.
+ """
+ def __init__(self, tui: App) -> None:
+ super().__init__()
+ self.tui = tui
+
+ def emit(self, record: LogRecord) -> None:
+ """
+ Emits a record to the TUI screen.
+
+ Appends the log message to the TUI screen
+ """
+ level = record.levelname
+ msg = record.getMessage()
+ self.tui.add_to_history(msg, level)
+
+
+def main() -> None:
+ """
+ Driver of the whole script, parses arguments, initialize the TUI and
+ the logger.
+ """
+ parser = argparse.ArgumentParser(description='AQMP TUI')
+ parser.add_argument('qmp_server', help='Address of the QMP server. '
+ 'Format <UNIX socket path | TCP addr:port>')
+ parser.add_argument('--num-retries', type=int, default=10,
+ help='Number of times to reconnect before giving up.')
+ parser.add_argument('--retry-delay', type=int,
+ help='Time(s) to wait before next retry. '
+ 'Default action is to wait 2s between each retry.')
+ parser.add_argument('--log-file', help='The Log file name')
+ parser.add_argument('--log-level', default='WARNING',
+ help='Log level <CRITICAL|ERROR|WARNING|INFO|DEBUG|>')
+ parser.add_argument('--asyncio-debug', action='store_true',
+ help='Enable debug mode for asyncio loop. '
+ 'Generates lot of output, makes TUI unusable when '
+ 'logs are logged in the TUI. '
+ 'Use only when logging to a file.')
+ args = parser.parse_args()
+
+ try:
+ address = QEMUMonitorProtocol.parse_address(args.qmp_server)
+ except QMPBadPortError as err:
+ parser.error(str(err))
+
+ app = App(address, args.num_retries, args.retry_delay)
+
+ root_logger = logging.getLogger()
+ root_logger.setLevel(logging.getLevelName(args.log_level))
+
+ if args.log_file:
+ root_logger.addHandler(logging.FileHandler(args.log_file))
+ else:
+ root_logger.addHandler(TUILogHandler(app))
+
+ app.run(args.asyncio_debug)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/python/qemu/aqmp/error.py b/python/qemu/aqmp/error.py
new file mode 100644
index 000000000..781f49b00
--- /dev/null
+++ b/python/qemu/aqmp/error.py
@@ -0,0 +1,50 @@
+"""
+AQMP Error Classes
+
+This package seeks to provide semantic error classes that are intended
+to be used directly by clients when they would like to handle particular
+semantic failures (e.g. "failed to connect") without needing to know the
+enumeration of possible reasons for that failure.
+
+AQMPError serves as the ancestor for all exceptions raised by this
+package, and is suitable for use in handling semantic errors from this
+library. In most cases, individual public methods will attempt to catch
+and re-encapsulate various exceptions to provide a semantic
+error-handling interface.
+
+.. admonition:: AQMP Exception Hierarchy Reference
+
+ | `Exception`
+ | +-- `AQMPError`
+ | +-- `ConnectError`
+ | +-- `StateError`
+ | +-- `ExecInterruptedError`
+ | +-- `ExecuteError`
+ | +-- `ListenerError`
+ | +-- `ProtocolError`
+ | +-- `DeserializationError`
+ | +-- `UnexpectedTypeError`
+ | +-- `ServerParseError`
+ | +-- `BadReplyError`
+ | +-- `GreetingError`
+ | +-- `NegotiationError`
+"""
+
+
+class AQMPError(Exception):
+ """Abstract error class for all errors originating from this package."""
+
+
+class ProtocolError(AQMPError):
+ """
+ Abstract error class for protocol failures.
+
+ Semantically, these errors are generally the fault of either the
+ protocol server or as a result of a bug in this library.
+
+ :param error_message: Human-readable string describing the error.
+ """
+ def __init__(self, error_message: str):
+ super().__init__(error_message)
+ #: Human-readable error message, without any prefix.
+ self.error_message: str = error_message
diff --git a/python/qemu/aqmp/events.py b/python/qemu/aqmp/events.py
new file mode 100644
index 000000000..5f7150c78
--- /dev/null
+++ b/python/qemu/aqmp/events.py
@@ -0,0 +1,717 @@
+"""
+AQMP Events and EventListeners
+
+Asynchronous QMP uses `EventListener` objects to listen for events. An
+`EventListener` is a FIFO event queue that can be pre-filtered to listen
+for only specific events. Each `EventListener` instance receives its own
+copy of events that it hears, so events may be consumed without fear or
+worry for depriving other listeners of events they need to hear.
+
+
+EventListener Tutorial
+----------------------
+
+In all of the following examples, we assume that we have a `QMPClient`
+instantiated named ``qmp`` that is already connected.
+
+
+`listener()` context blocks with one name
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The most basic usage is by using the `listener()` context manager to
+construct them:
+
+.. code:: python
+
+ with qmp.listener('STOP') as listener:
+ await qmp.execute('stop')
+ await listener.get()
+
+The listener is active only for the duration of the ‘with’ block. This
+instance listens only for ‘STOP’ events.
+
+
+`listener()` context blocks with two or more names
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Multiple events can be selected for by providing any ``Iterable[str]``:
+
+.. code:: python
+
+ with qmp.listener(('STOP', 'RESUME')) as listener:
+ await qmp.execute('stop')
+ event = await listener.get()
+ assert event['event'] == 'STOP'
+
+ await qmp.execute('cont')
+ event = await listener.get()
+ assert event['event'] == 'RESUME'
+
+
+`listener()` context blocks with no names
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+By omitting names entirely, you can listen to ALL events.
+
+.. code:: python
+
+ with qmp.listener() as listener:
+ await qmp.execute('stop')
+ event = await listener.get()
+ assert event['event'] == 'STOP'
+
+This isn’t a very good use case for this feature: In a non-trivial
+running system, we may not know what event will arrive next. Grabbing
+the top of a FIFO queue returning multiple kinds of events may be prone
+to error.
+
+
+Using async iterators to retrieve events
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+If you’d like to simply watch what events happen to arrive, you can use
+the listener as an async iterator:
+
+.. code:: python
+
+ with qmp.listener() as listener:
+ async for event in listener:
+ print(f"Event arrived: {event['event']}")
+
+This is analogous to the following code:
+
+.. code:: python
+
+ with qmp.listener() as listener:
+ while True:
+ event = listener.get()
+ print(f"Event arrived: {event['event']}")
+
+This event stream will never end, so these blocks will never terminate.
+
+
+Using asyncio.Task to concurrently retrieve events
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Since a listener’s event stream will never terminate, it is not likely
+useful to use that form in a script. For longer-running clients, we can
+create event handlers by using `asyncio.Task` to create concurrent
+coroutines:
+
+.. code:: python
+
+ async def print_events(listener):
+ try:
+ async for event in listener:
+ print(f"Event arrived: {event['event']}")
+ except asyncio.CancelledError:
+ return
+
+ with qmp.listener() as listener:
+ task = asyncio.Task(print_events(listener))
+ await qmp.execute('stop')
+ await qmp.execute('cont')
+ task.cancel()
+ await task
+
+However, there is no guarantee that these events will be received by the
+time we leave this context block. Once the context block is exited, the
+listener will cease to hear any new events, and becomes inert.
+
+Be mindful of the timing: the above example will *probably*– but does
+not *guarantee*– that both STOP/RESUMED events will be printed. The
+example below outlines how to use listeners outside of a context block.
+
+
+Using `register_listener()` and `remove_listener()`
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+To create a listener with a longer lifetime, beyond the scope of a
+single block, create a listener and then call `register_listener()`:
+
+.. code:: python
+
+ class MyClient:
+ def __init__(self, qmp):
+ self.qmp = qmp
+ self.listener = EventListener()
+
+ async def print_events(self):
+ try:
+ async for event in self.listener:
+ print(f"Event arrived: {event['event']}")
+ except asyncio.CancelledError:
+ return
+
+ async def run(self):
+ self.task = asyncio.Task(self.print_events)
+ self.qmp.register_listener(self.listener)
+ await qmp.execute('stop')
+ await qmp.execute('cont')
+
+ async def stop(self):
+ self.task.cancel()
+ await self.task
+ self.qmp.remove_listener(self.listener)
+
+The listener can be deactivated by using `remove_listener()`. When it is
+removed, any possible pending events are cleared and it can be
+re-registered at a later time.
+
+
+Using the built-in all events listener
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The `QMPClient` object creates its own default listener named
+:py:obj:`~Events.events` that can be used for the same purpose without
+having to create your own:
+
+.. code:: python
+
+ async def print_events(listener):
+ try:
+ async for event in listener:
+ print(f"Event arrived: {event['event']}")
+ except asyncio.CancelledError:
+ return
+
+ task = asyncio.Task(print_events(qmp.events))
+
+ await qmp.execute('stop')
+ await qmp.execute('cont')
+
+ task.cancel()
+ await task
+
+
+Using both .get() and async iterators
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The async iterator and `get()` methods pull events from the same FIFO
+queue. If you mix the usage of both, be aware: Events are emitted
+precisely once per listener.
+
+If multiple contexts try to pull events from the same listener instance,
+events are still emitted only precisely once.
+
+This restriction can be lifted by creating additional listeners.
+
+
+Creating multiple listeners
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Additional `EventListener` objects can be created at-will. Each one
+receives its own copy of events, with separate FIFO event queues.
+
+.. code:: python
+
+ my_listener = EventListener()
+ qmp.register_listener(my_listener)
+
+ await qmp.execute('stop')
+ copy1 = await my_listener.get()
+ copy2 = await qmp.events.get()
+
+ assert copy1 == copy2
+
+In this example, we await an event from both a user-created
+`EventListener` and the built-in events listener. Both receive the same
+event.
+
+
+Clearing listeners
+~~~~~~~~~~~~~~~~~~
+
+`EventListener` objects can be cleared, clearing all events seen thus far:
+
+.. code:: python
+
+ await qmp.execute('stop')
+ qmp.events.clear()
+ await qmp.execute('cont')
+ event = await qmp.events.get()
+ assert event['event'] == 'RESUME'
+
+`EventListener` objects are FIFO queues. If events are not consumed,
+they will remain in the queue until they are witnessed or discarded via
+`clear()`. FIFO queues will be drained automatically upon leaving a
+context block, or when calling `remove_listener()`.
+
+
+Accessing listener history
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+`EventListener` objects record their history. Even after being cleared,
+you can obtain a record of all events seen so far:
+
+.. code:: python
+
+ await qmp.execute('stop')
+ await qmp.execute('cont')
+ qmp.events.clear()
+
+ assert len(qmp.events.history) == 2
+ assert qmp.events.history[0]['event'] == 'STOP'
+ assert qmp.events.history[1]['event'] == 'RESUME'
+
+The history is updated immediately and does not require the event to be
+witnessed first.
+
+
+Using event filters
+~~~~~~~~~~~~~~~~~~~
+
+`EventListener` objects can be given complex filtering criteria if names
+are not sufficient:
+
+.. code:: python
+
+ def job1_filter(event) -> bool:
+ event_data = event.get('data', {})
+ event_job_id = event_data.get('id')
+ return event_job_id == "job1"
+
+ with qmp.listener('JOB_STATUS_CHANGE', job1_filter) as listener:
+ await qmp.execute('blockdev-backup', arguments={'job-id': 'job1', ...})
+ async for event in listener:
+ if event['data']['status'] == 'concluded':
+ break
+
+These filters might be most useful when parameterized. `EventListener`
+objects expect a function that takes only a single argument (the raw
+event, as a `Message`) and returns a bool; True if the event should be
+accepted into the stream. You can create a function that adapts this
+signature to accept configuration parameters:
+
+.. code:: python
+
+ def job_filter(job_id: str) -> EventFilter:
+ def filter(event: Message) -> bool:
+ return event['data']['id'] == job_id
+ return filter
+
+ with qmp.listener('JOB_STATUS_CHANGE', job_filter('job2')) as listener:
+ await qmp.execute('blockdev-backup', arguments={'job-id': 'job2', ...})
+ async for event in listener:
+ if event['data']['status'] == 'concluded':
+ break
+
+
+Activating an existing listener with `listen()`
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Listeners with complex, long configurations can also be created manually
+and activated temporarily by using `listen()` instead of `listener()`:
+
+.. code:: python
+
+ listener = EventListener(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
+ 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',
+ 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))
+
+ with qmp.listen(listener):
+ await qmp.execute('blockdev-backup', arguments={'job-id': 'job3', ...})
+ async for event in listener:
+ print(event)
+ if event['event'] == 'BLOCK_JOB_COMPLETED':
+ break
+
+Any events that are not witnessed by the time the block is left will be
+cleared from the queue; entering the block is an implicit
+`register_listener()` and leaving the block is an implicit
+`remove_listener()`.
+
+
+Activating multiple existing listeners with `listen()`
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+While `listener()` is only capable of creating a single listener,
+`listen()` is capable of activating multiple listeners simultaneously:
+
+.. code:: python
+
+ def job_filter(job_id: str) -> EventFilter:
+ def filter(event: Message) -> bool:
+ return event['data']['id'] == job_id
+ return filter
+
+ jobA = EventListener('JOB_STATUS_CHANGE', job_filter('jobA'))
+ jobB = EventListener('JOB_STATUS_CHANGE', job_filter('jobB'))
+
+ with qmp.listen(jobA, jobB):
+ qmp.execute('blockdev-create', arguments={'job-id': 'jobA', ...})
+ qmp.execute('blockdev-create', arguments={'job-id': 'jobB', ...})
+
+ async for event in jobA.get():
+ if event['data']['status'] == 'concluded':
+ break
+ async for event in jobB.get():
+ if event['data']['status'] == 'concluded':
+ break
+
+
+Extending the `EventListener` class
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+In the case that a more specialized `EventListener` is desired to
+provide either more functionality or more compact syntax for specialized
+cases, it can be extended.
+
+One of the key methods to extend or override is
+:py:meth:`~EventListener.accept()`. The default implementation checks an
+incoming message for:
+
+1. A qualifying name, if any :py:obj:`~EventListener.names` were
+ specified at initialization time
+2. That :py:obj:`~EventListener.event_filter()` returns True.
+
+This can be modified however you see fit to change the criteria for
+inclusion in the stream.
+
+For convenience, a ``JobListener`` class could be created that simply
+bakes in configuration so it does not need to be repeated:
+
+.. code:: python
+
+ class JobListener(EventListener):
+ def __init__(self, job_id: str):
+ super().__init__(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
+ 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',
+ 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))
+ self.job_id = job_id
+
+ def accept(self, event) -> bool:
+ if not super().accept(event):
+ return False
+ if event['event'] in ('BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'):
+ return event['data']['id'] == job_id
+ return event['data']['device'] == job_id
+
+From here on out, you can conjure up a custom-purpose listener that
+listens only for job-related events for a specific job-id easily:
+
+.. code:: python
+
+ listener = JobListener('job4')
+ with qmp.listener(listener):
+ await qmp.execute('blockdev-backup', arguments={'job-id': 'job4', ...})
+ async for event in listener:
+ print(event)
+ if event['event'] == 'BLOCK_JOB_COMPLETED':
+ break
+
+
+Experimental Interfaces & Design Issues
+---------------------------------------
+
+These interfaces are not ones I am sure I will keep or otherwise modify
+heavily.
+
+qmp.listener()’s type signature
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+`listener()` does not return anything, because it was assumed the caller
+already had a handle to the listener. However, for
+``qmp.listener(EventListener())`` forms, the caller will not have saved
+a handle to the listener.
+
+Because this function can accept *many* listeners, I found it hard to
+accurately type in a way where it could be used in both “one” or “many”
+forms conveniently and in a statically type-safe manner.
+
+Ultimately, I removed the return altogether, but perhaps with more time
+I can work out a way to re-add it.
+
+
+API Reference
+-------------
+
+"""
+
+import asyncio
+from contextlib import contextmanager
+import logging
+from typing import (
+ AsyncIterator,
+ Callable,
+ Iterable,
+ Iterator,
+ List,
+ Optional,
+ Set,
+ Tuple,
+ Union,
+)
+
+from .error import AQMPError
+from .message import Message
+
+
+EventNames = Union[str, Iterable[str], None]
+EventFilter = Callable[[Message], bool]
+
+
+class ListenerError(AQMPError):
+ """
+ Generic error class for `EventListener`-related problems.
+ """
+
+
+class EventListener:
+ """
+ Selectively listens for events with runtime configurable filtering.
+
+ This class is designed to be directly usable for the most common cases,
+ but it can be extended to provide more rigorous control.
+
+ :param names:
+ One or more names of events to listen for.
+ When not provided, listen for ALL events.
+ :param event_filter:
+ An optional event filtering function.
+ When names are also provided, this acts as a secondary filter.
+
+ When ``names`` and ``event_filter`` are both provided, the names
+ will be filtered first, and then the filter function will be called
+ second. The event filter function can assume that the format of the
+ event is a known format.
+ """
+ def __init__(
+ self,
+ names: EventNames = None,
+ event_filter: Optional[EventFilter] = None,
+ ):
+ # Queue of 'heard' events yet to be witnessed by a caller.
+ self._queue: 'asyncio.Queue[Message]' = asyncio.Queue()
+
+ # Intended as a historical record, NOT a processing queue or backlog.
+ self._history: List[Message] = []
+
+ #: Primary event filter, based on one or more event names.
+ self.names: Set[str] = set()
+ if isinstance(names, str):
+ self.names.add(names)
+ elif names is not None:
+ self.names.update(names)
+
+ #: Optional, secondary event filter.
+ self.event_filter: Optional[EventFilter] = event_filter
+
+ @property
+ def history(self) -> Tuple[Message, ...]:
+ """
+ A read-only history of all events seen so far.
+
+ This represents *every* event, including those not yet witnessed
+ via `get()` or ``async for``. It persists between `clear()`
+ calls and is immutable.
+ """
+ return tuple(self._history)
+
+ def accept(self, event: Message) -> bool:
+ """
+ Determine if this listener accepts this event.
+
+ This method determines which events will appear in the stream.
+ The default implementation simply checks the event against the
+ list of names and the event_filter to decide if this
+ `EventListener` accepts a given event. It can be
+ overridden/extended to provide custom listener behavior.
+
+ User code is not expected to need to invoke this method.
+
+ :param event: The event under consideration.
+ :return: `True`, if this listener accepts this event.
+ """
+ name_ok = (not self.names) or (event['event'] in self.names)
+ return name_ok and (
+ (not self.event_filter) or self.event_filter(event)
+ )
+
+ async def put(self, event: Message) -> None:
+ """
+ Conditionally put a new event into the FIFO queue.
+
+ This method is not designed to be invoked from user code, and it
+ should not need to be overridden. It is a public interface so
+ that `QMPClient` has an interface by which it can inform
+ registered listeners of new events.
+
+ The event will be put into the queue if
+ :py:meth:`~EventListener.accept()` returns `True`.
+
+ :param event: The new event to put into the FIFO queue.
+ """
+ if not self.accept(event):
+ return
+
+ self._history.append(event)
+ await self._queue.put(event)
+
+ async def get(self) -> Message:
+ """
+ Wait for the very next event in this stream.
+
+ If one is already available, return that one.
+ """
+ return await self._queue.get()
+
+ def empty(self) -> bool:
+ """
+ Return `True` if there are no pending events.
+ """
+ return self._queue.empty()
+
+ def clear(self) -> List[Message]:
+ """
+ Clear this listener of all pending events.
+
+ Called when an `EventListener` is being unregistered, this clears the
+ pending FIFO queue synchronously. It can be also be used to
+ manually clear any pending events, if desired.
+
+ :return: The cleared events, if any.
+
+ .. warning::
+ Take care when discarding events. Cleared events will be
+ silently tossed on the floor. All events that were ever
+ accepted by this listener are visible in `history()`.
+ """
+ events = []
+ while True:
+ try:
+ events.append(self._queue.get_nowait())
+ except asyncio.QueueEmpty:
+ break
+
+ return events
+
+ def __aiter__(self) -> AsyncIterator[Message]:
+ return self
+
+ async def __anext__(self) -> Message:
+ """
+ Enables the `EventListener` to function as an async iterator.
+
+ It may be used like this:
+
+ .. code:: python
+
+ async for event in listener:
+ print(event)
+
+ These iterators will never terminate of their own accord; you
+ must provide break conditions or otherwise prepare to run them
+ in an `asyncio.Task` that can be cancelled.
+ """
+ return await self.get()
+
+
+class Events:
+ """
+ Events is a mix-in class that adds event functionality to the QMP class.
+
+ It's designed specifically as a mix-in for `QMPClient`, and it
+ relies upon the class it is being mixed into having a 'logger'
+ property.
+ """
+ def __init__(self) -> None:
+ self._listeners: List[EventListener] = []
+
+ #: Default, all-events `EventListener`.
+ self.events: EventListener = EventListener()
+ self.register_listener(self.events)
+
+ # Parent class needs to have a logger
+ self.logger: logging.Logger
+
+ async def _event_dispatch(self, msg: Message) -> None:
+ """
+ Given a new event, propagate it to all of the active listeners.
+
+ :param msg: The event to propagate.
+ """
+ for listener in self._listeners:
+ await listener.put(msg)
+
+ def register_listener(self, listener: EventListener) -> None:
+ """
+ Register and activate an `EventListener`.
+
+ :param listener: The listener to activate.
+ :raise ListenerError: If the given listener is already registered.
+ """
+ if listener in self._listeners:
+ raise ListenerError("Attempted to re-register existing listener")
+ self.logger.debug("Registering %s.", str(listener))
+ self._listeners.append(listener)
+
+ def remove_listener(self, listener: EventListener) -> None:
+ """
+ Unregister and deactivate an `EventListener`.
+
+ The removed listener will have its pending events cleared via
+ `clear()`. The listener can be re-registered later when
+ desired.
+
+ :param listener: The listener to deactivate.
+ :raise ListenerError: If the given listener is not registered.
+ """
+ if listener == self.events:
+ raise ListenerError("Cannot remove the default listener.")
+ self.logger.debug("Removing %s.", str(listener))
+ listener.clear()
+ self._listeners.remove(listener)
+
+ @contextmanager
+ def listen(self, *listeners: EventListener) -> Iterator[None]:
+ r"""
+ Context manager: Temporarily listen with an `EventListener`.
+
+ Accepts one or more `EventListener` objects and registers them,
+ activating them for the duration of the context block.
+
+ `EventListener` objects will have any pending events in their
+ FIFO queue cleared upon exiting the context block, when they are
+ deactivated.
+
+ :param \*listeners: One or more EventListeners to activate.
+ :raise ListenerError: If the given listener(s) are already active.
+ """
+ _added = []
+
+ try:
+ for listener in listeners:
+ self.register_listener(listener)
+ _added.append(listener)
+
+ yield
+
+ finally:
+ for listener in _added:
+ self.remove_listener(listener)
+
+ @contextmanager
+ def listener(
+ self,
+ names: EventNames = (),
+ event_filter: Optional[EventFilter] = None
+ ) -> Iterator[EventListener]:
+ """
+ Context manager: Temporarily listen with a new `EventListener`.
+
+ Creates an `EventListener` object and registers it, activating
+ it for the duration of the context block.
+
+ :param names:
+ One or more names of events to listen for.
+ When not provided, listen for ALL events.
+ :param event_filter:
+ An optional event filtering function.
+ When names are also provided, this acts as a secondary filter.
+
+ :return: The newly created and active `EventListener`.
+ """
+ listener = EventListener(names, event_filter)
+ with self.listen(listener):
+ yield listener
diff --git a/python/qemu/aqmp/legacy.py b/python/qemu/aqmp/legacy.py
new file mode 100644
index 000000000..9e7b9fb80
--- /dev/null
+++ b/python/qemu/aqmp/legacy.py
@@ -0,0 +1,138 @@
+"""
+Sync QMP Wrapper
+
+This class pretends to be qemu.qmp.QEMUMonitorProtocol.
+"""
+
+import asyncio
+from typing import (
+ Awaitable,
+ List,
+ Optional,
+ TypeVar,
+ Union,
+)
+
+import qemu.qmp
+from qemu.qmp import QMPMessage, QMPReturnValue, SocketAddrT
+
+from .qmp_client import QMPClient
+
+
+# pylint: disable=missing-docstring
+
+
+class QEMUMonitorProtocol(qemu.qmp.QEMUMonitorProtocol):
+ def __init__(self, address: SocketAddrT,
+ server: bool = False,
+ nickname: Optional[str] = None):
+
+ # pylint: disable=super-init-not-called
+ self._aqmp = QMPClient(nickname)
+ self._aloop = asyncio.get_event_loop()
+ self._address = address
+ self._timeout: Optional[float] = None
+
+ _T = TypeVar('_T')
+
+ def _sync(
+ self, future: Awaitable[_T], timeout: Optional[float] = None
+ ) -> _T:
+ return self._aloop.run_until_complete(
+ asyncio.wait_for(future, timeout=timeout)
+ )
+
+ def _get_greeting(self) -> Optional[QMPMessage]:
+ if self._aqmp.greeting is not None:
+ # pylint: disable=protected-access
+ return self._aqmp.greeting._asdict()
+ return None
+
+ # __enter__ and __exit__ need no changes
+ # parse_address needs no changes
+
+ def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
+ self._aqmp.await_greeting = negotiate
+ self._aqmp.negotiate = negotiate
+
+ self._sync(
+ self._aqmp.connect(self._address)
+ )
+ return self._get_greeting()
+
+ def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
+ self._aqmp.await_greeting = True
+ self._aqmp.negotiate = True
+
+ self._sync(
+ self._aqmp.accept(self._address),
+ timeout
+ )
+
+ ret = self._get_greeting()
+ assert ret is not None
+ return ret
+
+ def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
+ return dict(
+ self._sync(
+ # pylint: disable=protected-access
+
+ # _raw() isn't a public API, because turning off
+ # automatic ID assignment is discouraged. For
+ # compatibility with iotests *only*, do it anyway.
+ self._aqmp._raw(qmp_cmd, assign_id=False),
+ self._timeout
+ )
+ )
+
+ # Default impl of cmd() delegates to cmd_obj
+
+ def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
+ return self._sync(
+ self._aqmp.execute(cmd, kwds),
+ self._timeout
+ )
+
+ def pull_event(self,
+ wait: Union[bool, float] = False) -> Optional[QMPMessage]:
+ if not wait:
+ # wait is False/0: "do not wait, do not except."
+ if self._aqmp.events.empty():
+ return None
+
+ # If wait is 'True', wait forever. If wait is False/0, the events
+ # queue must not be empty; but it still needs some real amount
+ # of time to complete.
+ timeout = None
+ if wait and isinstance(wait, float):
+ timeout = wait
+
+ return dict(
+ self._sync(
+ self._aqmp.events.get(),
+ timeout
+ )
+ )
+
+ def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]:
+ events = [dict(x) for x in self._aqmp.events.clear()]
+ if events:
+ return events
+
+ event = self.pull_event(wait)
+ return [event] if event is not None else []
+
+ def clear_events(self) -> None:
+ self._aqmp.events.clear()
+
+ def close(self) -> None:
+ self._sync(
+ self._aqmp.disconnect()
+ )
+
+ def settimeout(self, timeout: Optional[float]) -> None:
+ self._timeout = timeout
+
+ def send_fd_scm(self, fd: int) -> None:
+ self._aqmp.send_fd_scm(fd)
diff --git a/python/qemu/aqmp/message.py b/python/qemu/aqmp/message.py
new file mode 100644
index 000000000..f76ccc907
--- /dev/null
+++ b/python/qemu/aqmp/message.py
@@ -0,0 +1,209 @@
+"""
+QMP Message Format
+
+This module provides the `Message` class, which represents a single QMP
+message sent to or from the server.
+"""
+
+import json
+from json import JSONDecodeError
+from typing import (
+ Dict,
+ Iterator,
+ Mapping,
+ MutableMapping,
+ Optional,
+ Union,
+)
+
+from .error import ProtocolError
+
+
+class Message(MutableMapping[str, object]):
+ """
+ Represents a single QMP protocol message.
+
+ QMP uses JSON objects as its basic communicative unit; so this
+ Python object is a :py:obj:`~collections.abc.MutableMapping`. It may
+ be instantiated from either another mapping (like a `dict`), or from
+ raw `bytes` that still need to be deserialized.
+
+ Once instantiated, it may be treated like any other MutableMapping::
+
+ >>> msg = Message(b'{"hello": "world"}')
+ >>> assert msg['hello'] == 'world'
+ >>> msg['id'] = 'foobar'
+ >>> print(msg)
+ {
+ "hello": "world",
+ "id": "foobar"
+ }
+
+ It can be converted to `bytes`::
+
+ >>> msg = Message({"hello": "world"})
+ >>> print(bytes(msg))
+ b'{"hello":"world","id":"foobar"}'
+
+ Or back into a garden-variety `dict`::
+
+ >>> dict(msg)
+ {'hello': 'world'}
+
+
+ :param value: Initial value, if any.
+ :param eager:
+ When `True`, attempt to serialize or deserialize the initial value
+ immediately, so that conversion exceptions are raised during
+ the call to ``__init__()``.
+ """
+ # pylint: disable=too-many-ancestors
+
+ def __init__(self,
+ value: Union[bytes, Mapping[str, object]] = b'{}', *,
+ eager: bool = True):
+ self._data: Optional[bytes] = None
+ self._obj: Optional[Dict[str, object]] = None
+
+ if isinstance(value, bytes):
+ self._data = value
+ if eager:
+ self._obj = self._deserialize(self._data)
+ else:
+ self._obj = dict(value)
+ if eager:
+ self._data = self._serialize(self._obj)
+
+ # Methods necessary to implement the MutableMapping interface, see:
+ # https://docs.python.org/3/library/collections.abc.html#collections.abc.MutableMapping
+
+ # We get pop, popitem, clear, update, setdefault, __contains__,
+ # keys, items, values, get, __eq__ and __ne__ for free.
+
+ def __getitem__(self, key: str) -> object:
+ return self._object[key]
+
+ def __setitem__(self, key: str, value: object) -> None:
+ self._object[key] = value
+ self._data = None
+
+ def __delitem__(self, key: str) -> None:
+ del self._object[key]
+ self._data = None
+
+ def __iter__(self) -> Iterator[str]:
+ return iter(self._object)
+
+ def __len__(self) -> int:
+ return len(self._object)
+
+ # Dunder methods not related to MutableMapping:
+
+ def __repr__(self) -> str:
+ if self._obj is not None:
+ return f"Message({self._object!r})"
+ return f"Message({bytes(self)!r})"
+
+ def __str__(self) -> str:
+ """Pretty-printed representation of this QMP message."""
+ return json.dumps(self._object, indent=2)
+
+ def __bytes__(self) -> bytes:
+ """bytes representing this QMP message."""
+ if self._data is None:
+ self._data = self._serialize(self._obj or {})
+ return self._data
+
+ # Conversion Methods
+
+ @property
+ def _object(self) -> Dict[str, object]:
+ """
+ A `dict` representing this QMP message.
+
+ Generated on-demand, if required. This property is private
+ because it returns an object that could be used to invalidate
+ the internal state of the `Message` object.
+ """
+ if self._obj is None:
+ self._obj = self._deserialize(self._data or b'{}')
+ return self._obj
+
+ @classmethod
+ def _serialize(cls, value: object) -> bytes:
+ """
+ Serialize a JSON object as `bytes`.
+
+ :raise ValueError: When the object cannot be serialized.
+ :raise TypeError: When the object cannot be serialized.
+
+ :return: `bytes` ready to be sent over the wire.
+ """
+ return json.dumps(value, separators=(',', ':')).encode('utf-8')
+
+ @classmethod
+ def _deserialize(cls, data: bytes) -> Dict[str, object]:
+ """
+ Deserialize JSON `bytes` into a native Python `dict`.
+
+ :raise DeserializationError:
+ If JSON deserialization fails for any reason.
+ :raise UnexpectedTypeError:
+ If the data does not represent a JSON object.
+
+ :return: A `dict` representing this QMP message.
+ """
+ try:
+ obj = json.loads(data)
+ except JSONDecodeError as err:
+ emsg = "Failed to deserialize QMP message."
+ raise DeserializationError(emsg, data) from err
+ if not isinstance(obj, dict):
+ raise UnexpectedTypeError(
+ "QMP message is not a JSON object.",
+ obj
+ )
+ return obj
+
+
+class DeserializationError(ProtocolError):
+ """
+ A QMP message was not understood as JSON.
+
+ When this Exception is raised, ``__cause__`` will be set to the
+ `json.JSONDecodeError` Exception, which can be interrogated for
+ further details.
+
+ :param error_message: Human-readable string describing the error.
+ :param raw: The raw `bytes` that prompted the failure.
+ """
+ def __init__(self, error_message: str, raw: bytes):
+ super().__init__(error_message)
+ #: The raw `bytes` that were not understood as JSON.
+ self.raw: bytes = raw
+
+ def __str__(self) -> str:
+ return "\n".join([
+ super().__str__(),
+ f" raw bytes were: {str(self.raw)}",
+ ])
+
+
+class UnexpectedTypeError(ProtocolError):
+ """
+ A QMP message was JSON, but not a JSON object.
+
+ :param error_message: Human-readable string describing the error.
+ :param value: The deserialized JSON value that wasn't an object.
+ """
+ def __init__(self, error_message: str, value: object):
+ super().__init__(error_message)
+ #: The JSON value that was expected to be an object.
+ self.value: object = value
+
+ def __str__(self) -> str:
+ strval = json.dumps(self.value, indent=2)
+ return "\n".join([
+ super().__str__(),
+ f" json value was: {strval}",
+ ])
diff --git a/python/qemu/aqmp/models.py b/python/qemu/aqmp/models.py
new file mode 100644
index 000000000..de87f8780
--- /dev/null
+++ b/python/qemu/aqmp/models.py
@@ -0,0 +1,146 @@
+"""
+QMP Data Models
+
+This module provides simplistic data classes that represent the few
+structures that the QMP spec mandates; they are used to verify incoming
+data to make sure it conforms to spec.
+"""
+# pylint: disable=too-few-public-methods
+
+from collections import abc
+import copy
+from typing import (
+ Any,
+ Dict,
+ Mapping,
+ Optional,
+ Sequence,
+)
+
+
+class Model:
+ """
+ Abstract data model, representing some QMP object of some kind.
+
+ :param raw: The raw object to be validated.
+ :raise KeyError: If any required fields are absent.
+ :raise TypeError: If any required fields have the wrong type.
+ """
+ def __init__(self, raw: Mapping[str, Any]):
+ self._raw = raw
+
+ def _check_key(self, key: str) -> None:
+ if key not in self._raw:
+ raise KeyError(f"'{self._name}' object requires '{key}' member")
+
+ def _check_value(self, key: str, type_: type, typestr: str) -> None:
+ assert key in self._raw
+ if not isinstance(self._raw[key], type_):
+ raise TypeError(
+ f"'{self._name}' member '{key}' must be a {typestr}"
+ )
+
+ def _check_member(self, key: str, type_: type, typestr: str) -> None:
+ self._check_key(key)
+ self._check_value(key, type_, typestr)
+
+ @property
+ def _name(self) -> str:
+ return type(self).__name__
+
+ def __repr__(self) -> str:
+ return f"{self._name}({self._raw!r})"
+
+
+class Greeting(Model):
+ """
+ Defined in qmp-spec.txt, section 2.2, "Server Greeting".
+
+ :param raw: The raw Greeting object.
+ :raise KeyError: If any required fields are absent.
+ :raise TypeError: If any required fields have the wrong type.
+ """
+ def __init__(self, raw: Mapping[str, Any]):
+ super().__init__(raw)
+ #: 'QMP' member
+ self.QMP: QMPGreeting # pylint: disable=invalid-name
+
+ self._check_member('QMP', abc.Mapping, "JSON object")
+ self.QMP = QMPGreeting(self._raw['QMP'])
+
+ def _asdict(self) -> Dict[str, object]:
+ """
+ For compatibility with the iotests sync QMP wrapper.
+
+ The legacy QMP interface needs Greetings as a garden-variety Dict.
+
+ This interface is private in the hopes that it will be able to
+ be dropped again in the near-future. Caller beware!
+ """
+ return dict(copy.deepcopy(self._raw))
+
+
+class QMPGreeting(Model):
+ """
+ Defined in qmp-spec.txt, section 2.2, "Server Greeting".
+
+ :param raw: The raw QMPGreeting object.
+ :raise KeyError: If any required fields are absent.
+ :raise TypeError: If any required fields have the wrong type.
+ """
+ def __init__(self, raw: Mapping[str, Any]):
+ super().__init__(raw)
+ #: 'version' member
+ self.version: Mapping[str, object]
+ #: 'capabilities' member
+ self.capabilities: Sequence[object]
+
+ self._check_member('version', abc.Mapping, "JSON object")
+ self.version = self._raw['version']
+
+ self._check_member('capabilities', abc.Sequence, "JSON array")
+ self.capabilities = self._raw['capabilities']
+
+
+class ErrorResponse(Model):
+ """
+ Defined in qmp-spec.txt, section 2.4.2, "error".
+
+ :param raw: The raw ErrorResponse object.
+ :raise KeyError: If any required fields are absent.
+ :raise TypeError: If any required fields have the wrong type.
+ """
+ def __init__(self, raw: Mapping[str, Any]):
+ super().__init__(raw)
+ #: 'error' member
+ self.error: ErrorInfo
+ #: 'id' member
+ self.id: Optional[object] = None # pylint: disable=invalid-name
+
+ self._check_member('error', abc.Mapping, "JSON object")
+ self.error = ErrorInfo(self._raw['error'])
+
+ if 'id' in raw:
+ self.id = raw['id']
+
+
+class ErrorInfo(Model):
+ """
+ Defined in qmp-spec.txt, section 2.4.2, "error".
+
+ :param raw: The raw ErrorInfo object.
+ :raise KeyError: If any required fields are absent.
+ :raise TypeError: If any required fields have the wrong type.
+ """
+ def __init__(self, raw: Mapping[str, Any]):
+ super().__init__(raw)
+ #: 'class' member, with an underscore to avoid conflicts in Python.
+ self.class_: str
+ #: 'desc' member
+ self.desc: str
+
+ self._check_member('class', str, "string")
+ self.class_ = self._raw['class']
+
+ self._check_member('desc', str, "string")
+ self.desc = self._raw['desc']
diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py
new file mode 100644
index 000000000..5190b33b1
--- /dev/null
+++ b/python/qemu/aqmp/protocol.py
@@ -0,0 +1,917 @@
+"""
+Generic Asynchronous Message-based Protocol Support
+
+This module provides a generic framework for sending and receiving
+messages over an asyncio stream. `AsyncProtocol` is an abstract class
+that implements the core mechanisms of a simple send/receive protocol,
+and is designed to be extended.
+
+In this package, it is used as the implementation for the `QMPClient`
+class.
+"""
+
+import asyncio
+from asyncio import StreamReader, StreamWriter
+from enum import Enum
+from functools import wraps
+import logging
+from ssl import SSLContext
+from typing import (
+ Any,
+ Awaitable,
+ Callable,
+ Generic,
+ List,
+ Optional,
+ Tuple,
+ TypeVar,
+ Union,
+ cast,
+)
+
+from .error import AQMPError
+from .util import (
+ bottom_half,
+ create_task,
+ exception_summary,
+ flush,
+ is_closing,
+ pretty_traceback,
+ upper_half,
+ wait_closed,
+)
+
+
+T = TypeVar('T')
+_TaskFN = Callable[[], Awaitable[None]] # aka ``async def func() -> None``
+_FutureT = TypeVar('_FutureT', bound=Optional['asyncio.Future[Any]'])
+
+
+class Runstate(Enum):
+ """Protocol session runstate."""
+
+ #: Fully quiesced and disconnected.
+ IDLE = 0
+ #: In the process of connecting or establishing a session.
+ CONNECTING = 1
+ #: Fully connected and active session.
+ RUNNING = 2
+ #: In the process of disconnecting.
+ #: Runstate may be returned to `IDLE` by calling `disconnect()`.
+ DISCONNECTING = 3
+
+
+class ConnectError(AQMPError):
+ """
+ Raised when the initial connection process has failed.
+
+ This Exception always wraps a "root cause" exception that can be
+ interrogated for additional information.
+
+ :param error_message: Human-readable string describing the error.
+ :param exc: The root-cause exception.
+ """
+ def __init__(self, error_message: str, exc: Exception):
+ super().__init__(error_message)
+ #: Human-readable error string
+ self.error_message: str = error_message
+ #: Wrapped root cause exception
+ self.exc: Exception = exc
+
+ def __str__(self) -> str:
+ cause = str(self.exc)
+ if not cause:
+ # If there's no error string, use the exception name.
+ cause = exception_summary(self.exc)
+ return f"{self.error_message}: {cause}"
+
+
+class StateError(AQMPError):
+ """
+ An API command (connect, execute, etc) was issued at an inappropriate time.
+
+ This error is raised when a command like
+ :py:meth:`~AsyncProtocol.connect()` is issued at an inappropriate
+ time.
+
+ :param error_message: Human-readable string describing the state violation.
+ :param state: The actual `Runstate` seen at the time of the violation.
+ :param required: The `Runstate` required to process this command.
+ """
+ def __init__(self, error_message: str,
+ state: Runstate, required: Runstate):
+ super().__init__(error_message)
+ self.error_message = error_message
+ self.state = state
+ self.required = required
+
+
+F = TypeVar('F', bound=Callable[..., Any]) # pylint: disable=invalid-name
+
+
+# Don't Panic.
+def require(required_state: Runstate) -> Callable[[F], F]:
+ """
+ Decorator: protect a method so it can only be run in a certain `Runstate`.
+
+ :param required_state: The `Runstate` required to invoke this method.
+ :raise StateError: When the required `Runstate` is not met.
+ """
+ def _decorator(func: F) -> F:
+ # _decorator is the decorator that is built by calling the
+ # require() decorator factory; e.g.:
+ #
+ # @require(Runstate.IDLE) def foo(): ...
+ # will replace 'foo' with the result of '_decorator(foo)'.
+
+ @wraps(func)
+ def _wrapper(proto: 'AsyncProtocol[Any]',
+ *args: Any, **kwargs: Any) -> Any:
+ # _wrapper is the function that gets executed prior to the
+ # decorated method.
+
+ name = type(proto).__name__
+
+ if proto.runstate != required_state:
+ if proto.runstate == Runstate.CONNECTING:
+ emsg = f"{name} is currently connecting."
+ elif proto.runstate == Runstate.DISCONNECTING:
+ emsg = (f"{name} is disconnecting."
+ " Call disconnect() to return to IDLE state.")
+ elif proto.runstate == Runstate.RUNNING:
+ emsg = f"{name} is already connected and running."
+ elif proto.runstate == Runstate.IDLE:
+ emsg = f"{name} is disconnected and idle."
+ else:
+ assert False
+ raise StateError(emsg, proto.runstate, required_state)
+ # No StateError, so call the wrapped method.
+ return func(proto, *args, **kwargs)
+
+ # Return the decorated method;
+ # Transforming Func to Decorated[Func].
+ return cast(F, _wrapper)
+
+ # Return the decorator instance from the decorator factory. Phew!
+ return _decorator
+
+
+class AsyncProtocol(Generic[T]):
+ """
+ AsyncProtocol implements a generic async message-based protocol.
+
+ This protocol assumes the basic unit of information transfer between
+ client and server is a "message", the details of which are left up
+ to the implementation. It assumes the sending and receiving of these
+ messages is full-duplex and not necessarily correlated; i.e. it
+ supports asynchronous inbound messages.
+
+ It is designed to be extended by a specific protocol which provides
+ the implementations for how to read and send messages. These must be
+ defined in `_do_recv()` and `_do_send()`, respectively.
+
+ Other callbacks have a default implementation, but are intended to be
+ either extended or overridden:
+
+ - `_establish_session`:
+ The base implementation starts the reader/writer tasks.
+ A protocol implementation can override this call, inserting
+ actions to be taken prior to starting the reader/writer tasks
+ before the super() call; actions needing to occur afterwards
+ can be written after the super() call.
+ - `_on_message`:
+ Actions to be performed when a message is received.
+ - `_cb_outbound`:
+ Logging/Filtering hook for all outbound messages.
+ - `_cb_inbound`:
+ Logging/Filtering hook for all inbound messages.
+ This hook runs *before* `_on_message()`.
+
+ :param name:
+ Name used for logging messages, if any. By default, messages
+ will log to 'qemu.aqmp.protocol', but each individual connection
+ can be given its own logger by giving it a name; messages will
+ then log to 'qemu.aqmp.protocol.${name}'.
+ """
+ # pylint: disable=too-many-instance-attributes
+
+ #: Logger object for debugging messages from this connection.
+ logger = logging.getLogger(__name__)
+
+ # Maximum allowable size of read buffer
+ _limit = (64 * 1024)
+
+ # -------------------------
+ # Section: Public interface
+ # -------------------------
+
+ def __init__(self, name: Optional[str] = None) -> None:
+ #: The nickname for this connection, if any.
+ self.name: Optional[str] = name
+ if self.name is not None:
+ self.logger = self.logger.getChild(self.name)
+
+ # stream I/O
+ self._reader: Optional[StreamReader] = None
+ self._writer: Optional[StreamWriter] = None
+
+ # Outbound Message queue
+ self._outgoing: asyncio.Queue[T]
+
+ # Special, long-running tasks:
+ self._reader_task: Optional[asyncio.Future[None]] = None
+ self._writer_task: Optional[asyncio.Future[None]] = None
+
+ # Aggregate of the above two tasks, used for Exception management.
+ self._bh_tasks: Optional[asyncio.Future[Tuple[None, None]]] = None
+
+ #: Disconnect task. The disconnect implementation runs in a task
+ #: so that asynchronous disconnects (initiated by the
+ #: reader/writer) are allowed to wait for the reader/writers to
+ #: exit.
+ self._dc_task: Optional[asyncio.Future[None]] = None
+
+ self._runstate = Runstate.IDLE
+ self._runstate_changed: Optional[asyncio.Event] = None
+
+ def __repr__(self) -> str:
+ cls_name = type(self).__name__
+ tokens = []
+ if self.name is not None:
+ tokens.append(f"name={self.name!r}")
+ tokens.append(f"runstate={self.runstate.name}")
+ return f"<{cls_name} {' '.join(tokens)}>"
+
+ @property # @upper_half
+ def runstate(self) -> Runstate:
+ """The current `Runstate` of the connection."""
+ return self._runstate
+
+ @upper_half
+ async def runstate_changed(self) -> Runstate:
+ """
+ Wait for the `runstate` to change, then return that runstate.
+ """
+ await self._runstate_event.wait()
+ return self.runstate
+
+ @upper_half
+ @require(Runstate.IDLE)
+ async def accept(self, address: Union[str, Tuple[str, int]],
+ ssl: Optional[SSLContext] = None) -> None:
+ """
+ Accept a connection and begin processing message queues.
+
+ If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
+
+ :param address:
+ Address to listen to; UNIX socket path or TCP address/port.
+ :param ssl: SSL context to use, if any.
+
+ :raise StateError: When the `Runstate` is not `IDLE`.
+ :raise ConnectError: If a connection could not be accepted.
+ """
+ await self._new_session(address, ssl, accept=True)
+
+ @upper_half
+ @require(Runstate.IDLE)
+ async def connect(self, address: Union[str, Tuple[str, int]],
+ ssl: Optional[SSLContext] = None) -> None:
+ """
+ Connect to the server and begin processing message queues.
+
+ If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
+
+ :param address:
+ Address to connect to; UNIX socket path or TCP address/port.
+ :param ssl: SSL context to use, if any.
+
+ :raise StateError: When the `Runstate` is not `IDLE`.
+ :raise ConnectError: If a connection cannot be made to the server.
+ """
+ await self._new_session(address, ssl)
+
+ @upper_half
+ async def disconnect(self) -> None:
+ """
+ Disconnect and wait for all tasks to fully stop.
+
+ If there was an exception that caused the reader/writers to
+ terminate prematurely, it will be raised here.
+
+ :raise Exception: When the reader or writer terminate unexpectedly.
+ """
+ self.logger.debug("disconnect() called.")
+ self._schedule_disconnect()
+ await self._wait_disconnect()
+
+ # --------------------------
+ # Section: Session machinery
+ # --------------------------
+
+ @property
+ def _runstate_event(self) -> asyncio.Event:
+ # asyncio.Event() objects should not be created prior to entrance into
+ # an event loop, so we can ensure we create it in the correct context.
+ # Create it on-demand *only* at the behest of an 'async def' method.
+ if not self._runstate_changed:
+ self._runstate_changed = asyncio.Event()
+ return self._runstate_changed
+
+ @upper_half
+ @bottom_half
+ def _set_state(self, state: Runstate) -> None:
+ """
+ Change the `Runstate` of the protocol connection.
+
+ Signals the `runstate_changed` event.
+ """
+ if state == self._runstate:
+ return
+
+ self.logger.debug("Transitioning from '%s' to '%s'.",
+ str(self._runstate), str(state))
+ self._runstate = state
+ self._runstate_event.set()
+ self._runstate_event.clear()
+
+ @upper_half
+ async def _new_session(self,
+ address: Union[str, Tuple[str, int]],
+ ssl: Optional[SSLContext] = None,
+ accept: bool = False) -> None:
+ """
+ Establish a new connection and initialize the session.
+
+ Connect or accept a new connection, then begin the protocol
+ session machinery. If this call fails, `runstate` is guaranteed
+ to be set back to `IDLE`.
+
+ :param address:
+ Address to connect to/listen on;
+ UNIX socket path or TCP address/port.
+ :param ssl: SSL context to use, if any.
+ :param accept: Accept a connection instead of connecting when `True`.
+
+ :raise ConnectError:
+ When a connection or session cannot be established.
+
+ This exception will wrap a more concrete one. In most cases,
+ the wrapped exception will be `OSError` or `EOFError`. If a
+ protocol-level failure occurs while establishing a new
+ session, the wrapped error may also be an `AQMPError`.
+ """
+ assert self.runstate == Runstate.IDLE
+
+ try:
+ phase = "connection"
+ await self._establish_connection(address, ssl, accept)
+
+ phase = "session"
+ await self._establish_session()
+
+ except BaseException as err:
+ emsg = f"Failed to establish {phase}"
+ self.logger.error("%s: %s", emsg, exception_summary(err))
+ self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+ try:
+ # Reset from CONNECTING back to IDLE.
+ await self.disconnect()
+ except:
+ emsg = "Unexpected bottom half exception"
+ self.logger.critical("%s:\n%s\n", emsg, pretty_traceback())
+ raise
+
+ # NB: CancelledError is not a BaseException before Python 3.8
+ if isinstance(err, asyncio.CancelledError):
+ raise
+
+ if isinstance(err, Exception):
+ raise ConnectError(emsg, err) from err
+
+ # Raise BaseExceptions un-wrapped, they're more important.
+ raise
+
+ assert self.runstate == Runstate.RUNNING
+
+ @upper_half
+ async def _establish_connection(
+ self,
+ address: Union[str, Tuple[str, int]],
+ ssl: Optional[SSLContext] = None,
+ accept: bool = False
+ ) -> None:
+ """
+ Establish a new connection.
+
+ :param address:
+ Address to connect to/listen on;
+ UNIX socket path or TCP address/port.
+ :param ssl: SSL context to use, if any.
+ :param accept: Accept a connection instead of connecting when `True`.
+ """
+ assert self.runstate == Runstate.IDLE
+ self._set_state(Runstate.CONNECTING)
+
+ # Allow runstate watchers to witness 'CONNECTING' state; some
+ # failures in the streaming layer are synchronous and will not
+ # otherwise yield.
+ await asyncio.sleep(0)
+
+ if accept:
+ await self._do_accept(address, ssl)
+ else:
+ await self._do_connect(address, ssl)
+
+ @upper_half
+ async def _do_accept(self, address: Union[str, Tuple[str, int]],
+ ssl: Optional[SSLContext] = None) -> None:
+ """
+ Acting as the transport server, accept a single connection.
+
+ :param address:
+ Address to listen on; UNIX socket path or TCP address/port.
+ :param ssl: SSL context to use, if any.
+
+ :raise OSError: For stream-related errors.
+ """
+ self.logger.debug("Awaiting connection on %s ...", address)
+ connected = asyncio.Event()
+ server: Optional[asyncio.AbstractServer] = None
+
+ async def _client_connected_cb(reader: asyncio.StreamReader,
+ writer: asyncio.StreamWriter) -> None:
+ """Used to accept a single incoming connection, see below."""
+ nonlocal server
+ nonlocal connected
+
+ # A connection has been accepted; stop listening for new ones.
+ assert server is not None
+ server.close()
+ await server.wait_closed()
+ server = None
+
+ # Register this client as being connected
+ self._reader, self._writer = (reader, writer)
+
+ # Signal back: We've accepted a client!
+ connected.set()
+
+ if isinstance(address, tuple):
+ coro = asyncio.start_server(
+ _client_connected_cb,
+ host=address[0],
+ port=address[1],
+ ssl=ssl,
+ backlog=1,
+ limit=self._limit,
+ )
+ else:
+ coro = asyncio.start_unix_server(
+ _client_connected_cb,
+ path=address,
+ ssl=ssl,
+ backlog=1,
+ limit=self._limit,
+ )
+
+ server = await coro # Starts listening
+ await connected.wait() # Waits for the callback to fire (and finish)
+ assert server is None
+
+ self.logger.debug("Connection accepted.")
+
+ @upper_half
+ async def _do_connect(self, address: Union[str, Tuple[str, int]],
+ ssl: Optional[SSLContext] = None) -> None:
+ """
+ Acting as the transport client, initiate a connection to a server.
+
+ :param address:
+ Address to connect to; UNIX socket path or TCP address/port.
+ :param ssl: SSL context to use, if any.
+
+ :raise OSError: For stream-related errors.
+ """
+ self.logger.debug("Connecting to %s ...", address)
+
+ if isinstance(address, tuple):
+ connect = asyncio.open_connection(
+ address[0],
+ address[1],
+ ssl=ssl,
+ limit=self._limit,
+ )
+ else:
+ connect = asyncio.open_unix_connection(
+ path=address,
+ ssl=ssl,
+ limit=self._limit,
+ )
+ self._reader, self._writer = await connect
+
+ self.logger.debug("Connected.")
+
+ @upper_half
+ async def _establish_session(self) -> None:
+ """
+ Establish a new session.
+
+ Starts the readers/writer tasks; subclasses may perform their
+ own negotiations here. The Runstate will be RUNNING upon
+ successful conclusion.
+ """
+ assert self.runstate == Runstate.CONNECTING
+
+ self._outgoing = asyncio.Queue()
+
+ reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader')
+ writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer')
+
+ self._reader_task = create_task(reader_coro)
+ self._writer_task = create_task(writer_coro)
+
+ self._bh_tasks = asyncio.gather(
+ self._reader_task,
+ self._writer_task,
+ )
+
+ self._set_state(Runstate.RUNNING)
+ await asyncio.sleep(0) # Allow runstate_event to process
+
+ @upper_half
+ @bottom_half
+ def _schedule_disconnect(self) -> None:
+ """
+ Initiate a disconnect; idempotent.
+
+ This method is used both in the upper-half as a direct
+ consequence of `disconnect()`, and in the bottom-half in the
+ case of unhandled exceptions in the reader/writer tasks.
+
+ It can be invoked no matter what the `runstate` is.
+ """
+ if not self._dc_task:
+ self._set_state(Runstate.DISCONNECTING)
+ self.logger.debug("Scheduling disconnect.")
+ self._dc_task = create_task(self._bh_disconnect())
+
+ @upper_half
+ async def _wait_disconnect(self) -> None:
+ """
+ Waits for a previously scheduled disconnect to finish.
+
+ This method will gather any bottom half exceptions and re-raise
+ the one that occurred first; presuming it to be the root cause
+ of any subsequent Exceptions. It is intended to be used in the
+ upper half of the call chain.
+
+ :raise Exception:
+ Arbitrary exception re-raised on behalf of the reader/writer.
+ """
+ assert self.runstate == Runstate.DISCONNECTING
+ assert self._dc_task
+
+ aws: List[Awaitable[object]] = [self._dc_task]
+ if self._bh_tasks:
+ aws.insert(0, self._bh_tasks)
+ all_defined_tasks = asyncio.gather(*aws)
+
+ # Ensure disconnect is done; Exception (if any) is not raised here:
+ await asyncio.wait((self._dc_task,))
+
+ try:
+ await all_defined_tasks # Raise Exceptions from the bottom half.
+ finally:
+ self._cleanup()
+ self._set_state(Runstate.IDLE)
+
+ @upper_half
+ def _cleanup(self) -> None:
+ """
+ Fully reset this object to a clean state and return to `IDLE`.
+ """
+ def _paranoid_task_erase(task: _FutureT) -> Optional[_FutureT]:
+ # Help to erase a task, ENSURING it is fully quiesced first.
+ assert (task is None) or task.done()
+ return None if (task and task.done()) else task
+
+ assert self.runstate == Runstate.DISCONNECTING
+ self._dc_task = _paranoid_task_erase(self._dc_task)
+ self._reader_task = _paranoid_task_erase(self._reader_task)
+ self._writer_task = _paranoid_task_erase(self._writer_task)
+ self._bh_tasks = _paranoid_task_erase(self._bh_tasks)
+
+ self._reader = None
+ self._writer = None
+
+ # NB: _runstate_changed cannot be cleared because we still need it to
+ # send the final runstate changed event ...!
+
+ # ----------------------------
+ # Section: Bottom Half methods
+ # ----------------------------
+
+ @bottom_half
+ async def _bh_disconnect(self) -> None:
+ """
+ Disconnect and cancel all outstanding tasks.
+
+ It is designed to be called from its task context,
+ :py:obj:`~AsyncProtocol._dc_task`. By running in its own task,
+ it is free to wait on any pending actions that may still need to
+ occur in either the reader or writer tasks.
+ """
+ assert self.runstate == Runstate.DISCONNECTING
+
+ def _done(task: Optional['asyncio.Future[Any]']) -> bool:
+ return task is not None and task.done()
+
+ # Are we already in an error pathway? If either of the tasks are
+ # already done, or if we have no tasks but a reader/writer; we
+ # must be.
+ #
+ # NB: We can't use _bh_tasks to check for premature task
+ # completion, because it may not yet have had a chance to run
+ # and gather itself.
+ tasks = tuple(filter(None, (self._writer_task, self._reader_task)))
+ error_pathway = _done(self._reader_task) or _done(self._writer_task)
+ if not tasks:
+ error_pathway |= bool(self._reader) or bool(self._writer)
+
+ try:
+ # Try to flush the writer, if possible.
+ # This *may* cause an error and force us over into the error path.
+ if not error_pathway:
+ await self._bh_flush_writer()
+ except BaseException as err:
+ error_pathway = True
+ emsg = "Failed to flush the writer"
+ self.logger.error("%s: %s", emsg, exception_summary(err))
+ self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+ raise
+ finally:
+ # Cancel any still-running tasks (Won't raise):
+ if self._writer_task is not None and not self._writer_task.done():
+ self.logger.debug("Cancelling writer task.")
+ self._writer_task.cancel()
+ if self._reader_task is not None and not self._reader_task.done():
+ self.logger.debug("Cancelling reader task.")
+ self._reader_task.cancel()
+
+ # Close out the tasks entirely (Won't raise):
+ if tasks:
+ self.logger.debug("Waiting for tasks to complete ...")
+ await asyncio.wait(tasks)
+
+ # Lastly, close the stream itself. (*May raise*!):
+ await self._bh_close_stream(error_pathway)
+ self.logger.debug("Disconnected.")
+
+ @bottom_half
+ async def _bh_flush_writer(self) -> None:
+ if not self._writer_task:
+ return
+
+ self.logger.debug("Draining the outbound queue ...")
+ await self._outgoing.join()
+ if self._writer is not None:
+ self.logger.debug("Flushing the StreamWriter ...")
+ await flush(self._writer)
+
+ @bottom_half
+ async def _bh_close_stream(self, error_pathway: bool = False) -> None:
+ # NB: Closing the writer also implcitly closes the reader.
+ if not self._writer:
+ return
+
+ if not is_closing(self._writer):
+ self.logger.debug("Closing StreamWriter.")
+ self._writer.close()
+
+ self.logger.debug("Waiting for StreamWriter to close ...")
+ try:
+ await wait_closed(self._writer)
+ except Exception: # pylint: disable=broad-except
+ # It's hard to tell if the Stream is already closed or
+ # not. Even if one of the tasks has failed, it may have
+ # failed for a higher-layered protocol reason. The
+ # stream could still be open and perfectly fine.
+ # I don't know how to discern its health here.
+
+ if error_pathway:
+ # We already know that *something* went wrong. Let's
+ # just trust that the Exception we already have is the
+ # better one to present to the user, even if we don't
+ # genuinely *know* the relationship between the two.
+ self.logger.debug(
+ "Discarding Exception from wait_closed:\n%s\n",
+ pretty_traceback(),
+ )
+ else:
+ # Oops, this is a brand-new error!
+ raise
+ finally:
+ self.logger.debug("StreamWriter closed.")
+
+ @bottom_half
+ async def _bh_loop_forever(self, async_fn: _TaskFN, name: str) -> None:
+ """
+ Run one of the bottom-half methods in a loop forever.
+
+ If the bottom half ever raises any exception, schedule a
+ disconnect that will terminate the entire loop.
+
+ :param async_fn: The bottom-half method to run in a loop.
+ :param name: The name of this task, used for logging.
+ """
+ try:
+ while True:
+ await async_fn()
+ except asyncio.CancelledError:
+ # We have been cancelled by _bh_disconnect, exit gracefully.
+ self.logger.debug("Task.%s: cancelled.", name)
+ return
+ except BaseException as err:
+ self.logger.log(
+ logging.INFO if isinstance(err, EOFError) else logging.ERROR,
+ "Task.%s: %s",
+ name, exception_summary(err)
+ )
+ self.logger.debug("Task.%s: failure:\n%s\n",
+ name, pretty_traceback())
+ self._schedule_disconnect()
+ raise
+ finally:
+ self.logger.debug("Task.%s: exiting.", name)
+
+ @bottom_half
+ async def _bh_send_message(self) -> None:
+ """
+ Wait for an outgoing message, then send it.
+
+ Designed to be run in `_bh_loop_forever()`.
+ """
+ msg = await self._outgoing.get()
+ try:
+ await self._send(msg)
+ finally:
+ self._outgoing.task_done()
+
+ @bottom_half
+ async def _bh_recv_message(self) -> None:
+ """
+ Wait for an incoming message and call `_on_message` to route it.
+
+ Designed to be run in `_bh_loop_forever()`.
+ """
+ msg = await self._recv()
+ await self._on_message(msg)
+
+ # --------------------
+ # Section: Message I/O
+ # --------------------
+
+ @upper_half
+ @bottom_half
+ def _cb_outbound(self, msg: T) -> T:
+ """
+ Callback: outbound message hook.
+
+ This is intended for subclasses to be able to add arbitrary
+ hooks to filter or manipulate outgoing messages. The base
+ implementation does nothing but log the message without any
+ manipulation of the message.
+
+ :param msg: raw outbound message
+ :return: final outbound message
+ """
+ self.logger.debug("--> %s", str(msg))
+ return msg
+
+ @upper_half
+ @bottom_half
+ def _cb_inbound(self, msg: T) -> T:
+ """
+ Callback: inbound message hook.
+
+ This is intended for subclasses to be able to add arbitrary
+ hooks to filter or manipulate incoming messages. The base
+ implementation does nothing but log the message without any
+ manipulation of the message.
+
+ This method does not "handle" incoming messages; it is a filter.
+ The actual "endpoint" for incoming messages is `_on_message()`.
+
+ :param msg: raw inbound message
+ :return: processed inbound message
+ """
+ self.logger.debug("<-- %s", str(msg))
+ return msg
+
+ @upper_half
+ @bottom_half
+ async def _readline(self) -> bytes:
+ """
+ Wait for a newline from the incoming reader.
+
+ This method is provided as a convenience for upper-layer
+ protocols, as many are line-based.
+
+ This method *may* return a sequence of bytes without a trailing
+ newline if EOF occurs, but *some* bytes were received. In this
+ case, the next call will raise `EOFError`. It is assumed that
+ the layer 5 protocol will decide if there is anything meaningful
+ to be done with a partial message.
+
+ :raise OSError: For stream-related errors.
+ :raise EOFError:
+ If the reader stream is at EOF and there are no bytes to return.
+ :return: bytes, including the newline.
+ """
+ assert self._reader is not None
+ msg_bytes = await self._reader.readline()
+
+ if not msg_bytes:
+ if self._reader.at_eof():
+ raise EOFError
+
+ return msg_bytes
+
+ @upper_half
+ @bottom_half
+ async def _do_recv(self) -> T:
+ """
+ Abstract: Read from the stream and return a message.
+
+ Very low-level; intended to only be called by `_recv()`.
+ """
+ raise NotImplementedError
+
+ @upper_half
+ @bottom_half
+ async def _recv(self) -> T:
+ """
+ Read an arbitrary protocol message.
+
+ .. warning::
+ This method is intended primarily for `_bh_recv_message()`
+ to use in an asynchronous task loop. Using it outside of
+ this loop will "steal" messages from the normal routing
+ mechanism. It is safe to use prior to `_establish_session()`,
+ but should not be used otherwise.
+
+ This method uses `_do_recv()` to retrieve the raw message, and
+ then transforms it using `_cb_inbound()`.
+
+ :return: A single (filtered, processed) protocol message.
+ """
+ message = await self._do_recv()
+ return self._cb_inbound(message)
+
+ @upper_half
+ @bottom_half
+ def _do_send(self, msg: T) -> None:
+ """
+ Abstract: Write a message to the stream.
+
+ Very low-level; intended to only be called by `_send()`.
+ """
+ raise NotImplementedError
+
+ @upper_half
+ @bottom_half
+ async def _send(self, msg: T) -> None:
+ """
+ Send an arbitrary protocol message.
+
+ This method will transform any outgoing messages according to
+ `_cb_outbound()`.
+
+ .. warning::
+ Like `_recv()`, this method is intended to be called by
+ the writer task loop that processes outgoing
+ messages. Calling it directly may circumvent logic
+ implemented by the caller meant to correlate outgoing and
+ incoming messages.
+
+ :raise OSError: For problems with the underlying stream.
+ """
+ msg = self._cb_outbound(msg)
+ self._do_send(msg)
+
+ @bottom_half
+ async def _on_message(self, msg: T) -> None:
+ """
+ Called to handle the receipt of a new message.
+
+ .. caution::
+ This is executed from within the reader loop, so be advised
+ that waiting on either the reader or writer task will lead
+ to deadlock. Additionally, any unhandled exceptions will
+ directly cause the loop to halt, so logic may be best-kept
+ to a minimum if at all possible.
+
+ :param msg: The incoming message, already logged/filtered.
+ """
+ # Nothing to do in the abstract case.
diff --git a/python/qemu/aqmp/py.typed b/python/qemu/aqmp/py.typed
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/python/qemu/aqmp/py.typed
diff --git a/python/qemu/aqmp/qmp_client.py b/python/qemu/aqmp/qmp_client.py
new file mode 100644
index 000000000..8105e29fa
--- /dev/null
+++ b/python/qemu/aqmp/qmp_client.py
@@ -0,0 +1,651 @@
+"""
+QMP Protocol Implementation
+
+This module provides the `QMPClient` class, which can be used to connect
+and send commands to a QMP server such as QEMU. The QMP class can be
+used to either connect to a listening server, or used to listen and
+accept an incoming connection from that server.
+"""
+
+import asyncio
+import logging
+import socket
+import struct
+from typing import (
+ Dict,
+ List,
+ Mapping,
+ Optional,
+ Union,
+ cast,
+)
+
+from .error import AQMPError, ProtocolError
+from .events import Events
+from .message import Message
+from .models import ErrorResponse, Greeting
+from .protocol import AsyncProtocol, Runstate, require
+from .util import (
+ bottom_half,
+ exception_summary,
+ pretty_traceback,
+ upper_half,
+)
+
+
+class _WrappedProtocolError(ProtocolError):
+ """
+ Abstract exception class for Protocol errors that wrap an Exception.
+
+ :param error_message: Human-readable string describing the error.
+ :param exc: The root-cause exception.
+ """
+ def __init__(self, error_message: str, exc: Exception):
+ super().__init__(error_message)
+ self.exc = exc
+
+ def __str__(self) -> str:
+ return f"{self.error_message}: {self.exc!s}"
+
+
+class GreetingError(_WrappedProtocolError):
+ """
+ An exception occurred during the Greeting phase.
+
+ :param error_message: Human-readable string describing the error.
+ :param exc: The root-cause exception.
+ """
+
+
+class NegotiationError(_WrappedProtocolError):
+ """
+ An exception occurred during the Negotiation phase.
+
+ :param error_message: Human-readable string describing the error.
+ :param exc: The root-cause exception.
+ """
+
+
+class ExecuteError(AQMPError):
+ """
+ Exception raised by `QMPClient.execute()` on RPC failure.
+
+ :param error_response: The RPC error response object.
+ :param sent: The sent RPC message that caused the failure.
+ :param received: The raw RPC error reply received.
+ """
+ def __init__(self, error_response: ErrorResponse,
+ sent: Message, received: Message):
+ super().__init__(error_response.error.desc)
+ #: The sent `Message` that caused the failure
+ self.sent: Message = sent
+ #: The received `Message` that indicated failure
+ self.received: Message = received
+ #: The parsed error response
+ self.error: ErrorResponse = error_response
+ #: The QMP error class
+ self.error_class: str = error_response.error.class_
+
+
+class ExecInterruptedError(AQMPError):
+ """
+ Exception raised by `execute()` (et al) when an RPC is interrupted.
+
+ This error is raised when an `execute()` statement could not be
+ completed. This can occur because the connection itself was
+ terminated before a reply was received.
+
+ The true cause of the interruption will be available via `disconnect()`.
+ """
+
+
+class _MsgProtocolError(ProtocolError):
+ """
+ Abstract error class for protocol errors that have a `Message` object.
+
+ This Exception class is used for protocol errors where the `Message`
+ was mechanically understood, but was found to be inappropriate or
+ malformed.
+
+ :param error_message: Human-readable string describing the error.
+ :param msg: The QMP `Message` that caused the error.
+ """
+ def __init__(self, error_message: str, msg: Message):
+ super().__init__(error_message)
+ #: The received `Message` that caused the error.
+ self.msg: Message = msg
+
+ def __str__(self) -> str:
+ return "\n".join([
+ super().__str__(),
+ f" Message was: {str(self.msg)}\n",
+ ])
+
+
+class ServerParseError(_MsgProtocolError):
+ """
+ The Server sent a `Message` indicating parsing failure.
+
+ i.e. A reply has arrived from the server, but it is missing the "ID"
+ field, indicating a parsing error.
+
+ :param error_message: Human-readable string describing the error.
+ :param msg: The QMP `Message` that caused the error.
+ """
+
+
+class BadReplyError(_MsgProtocolError):
+ """
+ An execution reply was successfully routed, but not understood.
+
+ If a QMP message is received with an 'id' field to allow it to be
+ routed, but is otherwise malformed, this exception will be raised.
+
+ A reply message is malformed if it is missing either the 'return' or
+ 'error' keys, or if the 'error' value has missing keys or members of
+ the wrong type.
+
+ :param error_message: Human-readable string describing the error.
+ :param msg: The malformed reply that was received.
+ :param sent: The message that was sent that prompted the error.
+ """
+ def __init__(self, error_message: str, msg: Message, sent: Message):
+ super().__init__(error_message, msg)
+ #: The sent `Message` that caused the failure
+ self.sent = sent
+
+
+class QMPClient(AsyncProtocol[Message], Events):
+ """
+ Implements a QMP client connection.
+
+ QMP can be used to establish a connection as either the transport
+ client or server, though this class always acts as the QMP client.
+
+ :param name: Optional nickname for the connection, used for logging.
+
+ Basic script-style usage looks like this::
+
+ qmp = QMPClient('my_virtual_machine_name')
+ await qmp.connect(('127.0.0.1', 1234))
+ ...
+ res = await qmp.execute('block-query')
+ ...
+ await qmp.disconnect()
+
+ Basic async client-style usage looks like this::
+
+ class Client:
+ def __init__(self, name: str):
+ self.qmp = QMPClient(name)
+
+ async def watch_events(self):
+ try:
+ async for event in self.qmp.events:
+ print(f"Event: {event['event']}")
+ except asyncio.CancelledError:
+ return
+
+ async def run(self, address='/tmp/qemu.socket'):
+ await self.qmp.connect(address)
+ asyncio.create_task(self.watch_events())
+ await self.qmp.runstate_changed.wait()
+ await self.disconnect()
+
+ See `aqmp.events` for more detail on event handling patterns.
+ """
+ #: Logger object used for debugging messages.
+ logger = logging.getLogger(__name__)
+
+ # Read buffer limit; large enough to accept query-qmp-schema
+ _limit = (256 * 1024)
+
+ # Type alias for pending execute() result items
+ _PendingT = Union[Message, ExecInterruptedError]
+
+ def __init__(self, name: Optional[str] = None) -> None:
+ super().__init__(name)
+ Events.__init__(self)
+
+ #: Whether or not to await a greeting after establishing a connection.
+ self.await_greeting: bool = True
+
+ #: Whether or not to perform capabilities negotiation upon connection.
+ #: Implies `await_greeting`.
+ self.negotiate: bool = True
+
+ # Cached Greeting, if one was awaited.
+ self._greeting: Optional[Greeting] = None
+
+ # Command ID counter
+ self._execute_id = 0
+
+ # Incoming RPC reply messages.
+ self._pending: Dict[
+ Union[str, None],
+ 'asyncio.Queue[QMPClient._PendingT]'
+ ] = {}
+
+ @property
+ def greeting(self) -> Optional[Greeting]:
+ """The `Greeting` from the QMP server, if any."""
+ return self._greeting
+
+ @upper_half
+ async def _establish_session(self) -> None:
+ """
+ Initiate the QMP session.
+
+ Wait for the QMP greeting and perform capabilities negotiation.
+
+ :raise GreetingError: When the greeting is not understood.
+ :raise NegotiationError: If the negotiation fails.
+ :raise EOFError: When the server unexpectedly hangs up.
+ :raise OSError: For underlying stream errors.
+ """
+ self._greeting = None
+ self._pending = {}
+
+ if self.await_greeting or self.negotiate:
+ self._greeting = await self._get_greeting()
+
+ if self.negotiate:
+ await self._negotiate()
+
+ # This will start the reader/writers:
+ await super()._establish_session()
+
+ @upper_half
+ async def _get_greeting(self) -> Greeting:
+ """
+ :raise GreetingError: When the greeting is not understood.
+ :raise EOFError: When the server unexpectedly hangs up.
+ :raise OSError: For underlying stream errors.
+
+ :return: the Greeting object given by the server.
+ """
+ self.logger.debug("Awaiting greeting ...")
+
+ try:
+ msg = await self._recv()
+ return Greeting(msg)
+ except (ProtocolError, KeyError, TypeError) as err:
+ emsg = "Did not understand Greeting"
+ self.logger.error("%s: %s", emsg, exception_summary(err))
+ self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+ raise GreetingError(emsg, err) from err
+ except BaseException as err:
+ # EOFError, OSError, or something unexpected.
+ emsg = "Failed to receive Greeting"
+ self.logger.error("%s: %s", emsg, exception_summary(err))
+ self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+ raise
+
+ @upper_half
+ async def _negotiate(self) -> None:
+ """
+ Perform QMP capabilities negotiation.
+
+ :raise NegotiationError: When negotiation fails.
+ :raise EOFError: When the server unexpectedly hangs up.
+ :raise OSError: For underlying stream errors.
+ """
+ self.logger.debug("Negotiating capabilities ...")
+
+ arguments: Dict[str, List[str]] = {'enable': []}
+ if self._greeting and 'oob' in self._greeting.QMP.capabilities:
+ arguments['enable'].append('oob')
+ msg = self.make_execute_msg('qmp_capabilities', arguments=arguments)
+
+ # It's not safe to use execute() here, because the reader/writers
+ # aren't running. AsyncProtocol *requires* that a new session
+ # does not fail after the reader/writers are running!
+ try:
+ await self._send(msg)
+ reply = await self._recv()
+ assert 'return' in reply
+ assert 'error' not in reply
+ except (ProtocolError, AssertionError) as err:
+ emsg = "Negotiation failed"
+ self.logger.error("%s: %s", emsg, exception_summary(err))
+ self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+ raise NegotiationError(emsg, err) from err
+ except BaseException as err:
+ # EOFError, OSError, or something unexpected.
+ emsg = "Negotiation failed"
+ self.logger.error("%s: %s", emsg, exception_summary(err))
+ self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+ raise
+
+ @bottom_half
+ async def _bh_disconnect(self) -> None:
+ try:
+ await super()._bh_disconnect()
+ finally:
+ if self._pending:
+ self.logger.debug("Cancelling pending executions")
+ keys = self._pending.keys()
+ for key in keys:
+ self.logger.debug("Cancelling execution '%s'", key)
+ self._pending[key].put_nowait(
+ ExecInterruptedError("Disconnected")
+ )
+
+ self.logger.debug("QMP Disconnected.")
+
+ @upper_half
+ def _cleanup(self) -> None:
+ super()._cleanup()
+ assert not self._pending
+
+ @bottom_half
+ async def _on_message(self, msg: Message) -> None:
+ """
+ Add an incoming message to the appropriate queue/handler.
+
+ :raise ServerParseError: When Message indicates server parse failure.
+ """
+ # Incoming messages are not fully parsed/validated here;
+ # do only light peeking to know how to route the messages.
+
+ if 'event' in msg:
+ await self._event_dispatch(msg)
+ return
+
+ # Below, we assume everything left is an execute/exec-oob response.
+
+ exec_id = cast(Optional[str], msg.get('id'))
+
+ if exec_id in self._pending:
+ await self._pending[exec_id].put(msg)
+ return
+
+ # We have a message we can't route back to a caller.
+
+ is_error = 'error' in msg
+ has_id = 'id' in msg
+
+ if is_error and not has_id:
+ # This is very likely a server parsing error.
+ # It doesn't inherently belong to any pending execution.
+ # Instead of performing clever recovery, just terminate.
+ # See "NOTE" in qmp-spec.txt, section 2.4.2
+ raise ServerParseError(
+ ("Server sent an error response without an ID, "
+ "but there are no ID-less executions pending. "
+ "Assuming this is a server parser failure."),
+ msg
+ )
+
+ # qmp-spec.txt, section 2.4:
+ # 'Clients should drop all the responses
+ # that have an unknown "id" field.'
+ self.logger.log(
+ logging.ERROR if is_error else logging.WARNING,
+ "Unknown ID '%s', message dropped.",
+ exec_id,
+ )
+ self.logger.debug("Unroutable message: %s", str(msg))
+
+ @upper_half
+ @bottom_half
+ async def _do_recv(self) -> Message:
+ """
+ :raise OSError: When a stream error is encountered.
+ :raise EOFError: When the stream is at EOF.
+ :raise ProtocolError:
+ When the Message is not understood.
+ See also `Message._deserialize`.
+
+ :return: A single QMP `Message`.
+ """
+ msg_bytes = await self._readline()
+ msg = Message(msg_bytes, eager=True)
+ return msg
+
+ @upper_half
+ @bottom_half
+ def _do_send(self, msg: Message) -> None:
+ """
+ :raise ValueError: JSON serialization failure
+ :raise TypeError: JSON serialization failure
+ :raise OSError: When a stream error is encountered.
+ """
+ assert self._writer is not None
+ self._writer.write(bytes(msg))
+
+ @upper_half
+ def _get_exec_id(self) -> str:
+ exec_id = f"__aqmp#{self._execute_id:05d}"
+ self._execute_id += 1
+ return exec_id
+
+ @upper_half
+ async def _issue(self, msg: Message) -> Union[None, str]:
+ """
+ Issue a QMP `Message` and do not wait for a reply.
+
+ :param msg: The QMP `Message` to send to the server.
+
+ :return: The ID of the `Message` sent.
+ """
+ msg_id: Optional[str] = None
+ if 'id' in msg:
+ assert isinstance(msg['id'], str)
+ msg_id = msg['id']
+
+ self._pending[msg_id] = asyncio.Queue(maxsize=1)
+ await self._outgoing.put(msg)
+
+ return msg_id
+
+ @upper_half
+ async def _reply(self, msg_id: Union[str, None]) -> Message:
+ """
+ Await a reply to a previously issued QMP message.
+
+ :param msg_id: The ID of the previously issued message.
+
+ :return: The reply from the server.
+ :raise ExecInterruptedError:
+ When the reply could not be retrieved because the connection
+ was lost, or some other problem.
+ """
+ queue = self._pending[msg_id]
+ result = await queue.get()
+
+ try:
+ if isinstance(result, ExecInterruptedError):
+ raise result
+ return result
+ finally:
+ del self._pending[msg_id]
+
+ @upper_half
+ async def _execute(self, msg: Message, assign_id: bool = True) -> Message:
+ """
+ Send a QMP `Message` to the server and await a reply.
+
+ This method *assumes* you are sending some kind of an execute
+ statement that *will* receive a reply.
+
+ An execution ID will be assigned if assign_id is `True`. It can be
+ disabled, but this requires that an ID is manually assigned
+ instead. For manually assigned IDs, you must not use the string
+ '__aqmp#' anywhere in the ID.
+
+ :param msg: The QMP `Message` to execute.
+ :param assign_id: If True, assign a new execution ID.
+
+ :return: Execution reply from the server.
+ :raise ExecInterruptedError:
+ When the reply could not be retrieved because the connection
+ was lost, or some other problem.
+ """
+ if assign_id:
+ msg['id'] = self._get_exec_id()
+ elif 'id' in msg:
+ assert isinstance(msg['id'], str)
+ assert '__aqmp#' not in msg['id']
+
+ exec_id = await self._issue(msg)
+ return await self._reply(exec_id)
+
+ @upper_half
+ @require(Runstate.RUNNING)
+ async def _raw(
+ self,
+ msg: Union[Message, Mapping[str, object], bytes],
+ assign_id: bool = True,
+ ) -> Message:
+ """
+ Issue a raw `Message` to the QMP server and await a reply.
+
+ :param msg:
+ A Message to send to the server. It may be a `Message`, any
+ Mapping (including Dict), or raw bytes.
+ :param assign_id:
+ Assign an arbitrary execution ID to this message. If
+ `False`, the existing id must either be absent (and no other
+ such pending execution may omit an ID) or a string. If it is
+ a string, it must not start with '__aqmp#' and no other such
+ pending execution may currently be using that ID.
+
+ :return: Execution reply from the server.
+
+ :raise ExecInterruptedError:
+ When the reply could not be retrieved because the connection
+ was lost, or some other problem.
+ :raise TypeError:
+ When assign_id is `False`, an ID is given, and it is not a string.
+ :raise ValueError:
+ When assign_id is `False`, but the ID is not usable;
+ Either because it starts with '__aqmp#' or it is already in-use.
+ """
+ # 1. convert generic Mapping or bytes to a QMP Message
+ # 2. copy Message objects so that we assign an ID only to the copy.
+ msg = Message(msg)
+
+ exec_id = msg.get('id')
+ if not assign_id and 'id' in msg:
+ if not isinstance(exec_id, str):
+ raise TypeError(f"ID ('{exec_id}') must be a string.")
+ if exec_id.startswith('__aqmp#'):
+ raise ValueError(
+ f"ID ('{exec_id}') must not start with '__aqmp#'."
+ )
+
+ if not assign_id and exec_id in self._pending:
+ raise ValueError(
+ f"ID '{exec_id}' is in-use and cannot be used."
+ )
+
+ return await self._execute(msg, assign_id=assign_id)
+
+ @upper_half
+ @require(Runstate.RUNNING)
+ async def execute_msg(self, msg: Message) -> object:
+ """
+ Execute a QMP command and return its value.
+
+ :param msg: The QMP `Message` to execute.
+
+ :return:
+ The command execution return value from the server. The type of
+ object returned depends on the command that was issued,
+ though most in QEMU return a `dict`.
+ :raise ValueError:
+ If the QMP `Message` does not have either the 'execute' or
+ 'exec-oob' fields set.
+ :raise ExecuteError: When the server returns an error response.
+ :raise ExecInterruptedError: if the connection was terminated early.
+ """
+ if not ('execute' in msg or 'exec-oob' in msg):
+ raise ValueError("Requires 'execute' or 'exec-oob' message")
+
+ # Copy the Message so that the ID assigned by _execute() is
+ # local to this method; allowing the ID to be seen in raised
+ # Exceptions but without modifying the caller's held copy.
+ msg = Message(msg)
+ reply = await self._execute(msg)
+
+ if 'error' in reply:
+ try:
+ error_response = ErrorResponse(reply)
+ except (KeyError, TypeError) as err:
+ # Error response was malformed.
+ raise BadReplyError(
+ "QMP error reply is malformed", reply, msg,
+ ) from err
+
+ raise ExecuteError(error_response, msg, reply)
+
+ if 'return' not in reply:
+ raise BadReplyError(
+ "QMP reply is missing a 'error' or 'return' member",
+ reply, msg,
+ )
+
+ return reply['return']
+
+ @classmethod
+ def make_execute_msg(cls, cmd: str,
+ arguments: Optional[Mapping[str, object]] = None,
+ oob: bool = False) -> Message:
+ """
+ Create an executable message to be sent by `execute_msg` later.
+
+ :param cmd: QMP command name.
+ :param arguments: Arguments (if any). Must be JSON-serializable.
+ :param oob: If `True`, execute "out of band".
+
+ :return: An executable QMP `Message`.
+ """
+ msg = Message({'exec-oob' if oob else 'execute': cmd})
+ if arguments is not None:
+ msg['arguments'] = arguments
+ return msg
+
+ @upper_half
+ async def execute(self, cmd: str,
+ arguments: Optional[Mapping[str, object]] = None,
+ oob: bool = False) -> object:
+ """
+ Execute a QMP command and return its value.
+
+ :param cmd: QMP command name.
+ :param arguments: Arguments (if any). Must be JSON-serializable.
+ :param oob: If `True`, execute "out of band".
+
+ :return:
+ The command execution return value from the server. The type of
+ object returned depends on the command that was issued,
+ though most in QEMU return a `dict`.
+ :raise ExecuteError: When the server returns an error response.
+ :raise ExecInterruptedError: if the connection was terminated early.
+ """
+ msg = self.make_execute_msg(cmd, arguments, oob=oob)
+ return await self.execute_msg(msg)
+
+ @upper_half
+ @require(Runstate.RUNNING)
+ def send_fd_scm(self, fd: int) -> None:
+ """
+ Send a file descriptor to the remote via SCM_RIGHTS.
+ """
+ assert self._writer is not None
+ sock = self._writer.transport.get_extra_info('socket')
+
+ if sock.family != socket.AF_UNIX:
+ raise AQMPError("Sending file descriptors requires a UNIX socket.")
+
+ if not hasattr(sock, 'sendmsg'):
+ # We need to void the warranty sticker.
+ # Access to sendmsg is scheduled for removal in Python 3.11.
+ # Find the real backing socket to use it anyway.
+ sock = sock._sock # pylint: disable=protected-access
+
+ sock.sendmsg(
+ [b' '],
+ [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('@i', fd))]
+ )
diff --git a/python/qemu/aqmp/util.py b/python/qemu/aqmp/util.py
new file mode 100644
index 000000000..eaa5fc7d5
--- /dev/null
+++ b/python/qemu/aqmp/util.py
@@ -0,0 +1,217 @@
+"""
+Miscellaneous Utilities
+
+This module provides asyncio utilities and compatibility wrappers for
+Python 3.6 to provide some features that otherwise become available in
+Python 3.7+.
+
+Various logging and debugging utilities are also provided, such as
+`exception_summary()` and `pretty_traceback()`, used primarily for
+adding information into the logging stream.
+"""
+
+import asyncio
+import sys
+import traceback
+from typing import (
+ Any,
+ Coroutine,
+ Optional,
+ TypeVar,
+ cast,
+)
+
+
+T = TypeVar('T')
+
+
+# --------------------------
+# Section: Utility Functions
+# --------------------------
+
+
+async def flush(writer: asyncio.StreamWriter) -> None:
+ """
+ Utility function to ensure a StreamWriter is *fully* drained.
+
+ `asyncio.StreamWriter.drain` only promises we will return to below
+ the "high-water mark". This function ensures we flush the entire
+ buffer -- by setting the high water mark to 0 and then calling
+ drain. The flow control limits are restored after the call is
+ completed.
+ """
+ transport = cast(asyncio.WriteTransport, writer.transport)
+
+ # https://github.com/python/typeshed/issues/5779
+ low, high = transport.get_write_buffer_limits() # type: ignore
+ transport.set_write_buffer_limits(0, 0)
+ try:
+ await writer.drain()
+ finally:
+ transport.set_write_buffer_limits(high, low)
+
+
+def upper_half(func: T) -> T:
+ """
+ Do-nothing decorator that annotates a method as an "upper-half" method.
+
+ These methods must not call bottom-half functions directly, but can
+ schedule them to run.
+ """
+ return func
+
+
+def bottom_half(func: T) -> T:
+ """
+ Do-nothing decorator that annotates a method as a "bottom-half" method.
+
+ These methods must take great care to handle their own exceptions whenever
+ possible. If they go unhandled, they will cause termination of the loop.
+
+ These methods do not, in general, have the ability to directly
+ report information to a caller’s context and will usually be
+ collected as a Task result instead.
+
+ They must not call upper-half functions directly.
+ """
+ return func
+
+
+# -------------------------------
+# Section: Compatibility Wrappers
+# -------------------------------
+
+
+def create_task(coro: Coroutine[Any, Any, T],
+ loop: Optional[asyncio.AbstractEventLoop] = None
+ ) -> 'asyncio.Future[T]':
+ """
+ Python 3.6-compatible `asyncio.create_task` wrapper.
+
+ :param coro: The coroutine to execute in a task.
+ :param loop: Optionally, the loop to create the task in.
+
+ :return: An `asyncio.Future` object.
+ """
+ if sys.version_info >= (3, 7):
+ if loop is not None:
+ return loop.create_task(coro)
+ return asyncio.create_task(coro) # pylint: disable=no-member
+
+ # Python 3.6:
+ return asyncio.ensure_future(coro, loop=loop)
+
+
+def is_closing(writer: asyncio.StreamWriter) -> bool:
+ """
+ Python 3.6-compatible `asyncio.StreamWriter.is_closing` wrapper.
+
+ :param writer: The `asyncio.StreamWriter` object.
+ :return: `True` if the writer is closing, or closed.
+ """
+ if sys.version_info >= (3, 7):
+ return writer.is_closing()
+
+ # Python 3.6:
+ transport = writer.transport
+ assert isinstance(transport, asyncio.WriteTransport)
+ return transport.is_closing()
+
+
+async def wait_closed(writer: asyncio.StreamWriter) -> None:
+ """
+ Python 3.6-compatible `asyncio.StreamWriter.wait_closed` wrapper.
+
+ :param writer: The `asyncio.StreamWriter` to wait on.
+ """
+ if sys.version_info >= (3, 7):
+ await writer.wait_closed()
+ return
+
+ # Python 3.6
+ transport = writer.transport
+ assert isinstance(transport, asyncio.WriteTransport)
+
+ while not transport.is_closing():
+ await asyncio.sleep(0)
+
+ # This is an ugly workaround, but it's the best I can come up with.
+ sock = transport.get_extra_info('socket')
+
+ if sock is None:
+ # Our transport doesn't have a socket? ...
+ # Nothing we can reasonably do.
+ return
+
+ while sock.fileno() != -1:
+ await asyncio.sleep(0)
+
+
+def asyncio_run(coro: Coroutine[Any, Any, T], *, debug: bool = False) -> T:
+ """
+ Python 3.6-compatible `asyncio.run` wrapper.
+
+ :param coro: A coroutine to execute now.
+ :return: The return value from the coroutine.
+ """
+ if sys.version_info >= (3, 7):
+ return asyncio.run(coro, debug=debug)
+
+ # Python 3.6
+ loop = asyncio.get_event_loop()
+ loop.set_debug(debug)
+ ret = loop.run_until_complete(coro)
+ loop.close()
+
+ return ret
+
+
+# ----------------------------
+# Section: Logging & Debugging
+# ----------------------------
+
+
+def exception_summary(exc: BaseException) -> str:
+ """
+ Return a summary string of an arbitrary exception.
+
+ It will be of the form "ExceptionType: Error Message", if the error
+ string is non-empty, and just "ExceptionType" otherwise.
+ """
+ name = type(exc).__qualname__
+ smod = type(exc).__module__
+ if smod not in ("__main__", "builtins"):
+ name = smod + '.' + name
+
+ error = str(exc)
+ if error:
+ return f"{name}: {error}"
+ return name
+
+
+def pretty_traceback(prefix: str = " | ") -> str:
+ """
+ Formats the current traceback, indented to provide visual distinction.
+
+ This is useful for printing a traceback within a traceback for
+ debugging purposes when encapsulating errors to deliver them up the
+ stack; when those errors are printed, this helps provide a nice
+ visual grouping to quickly identify the parts of the error that
+ belong to the inner exception.
+
+ :param prefix: The prefix to append to each line of the traceback.
+ :return: A string, formatted something like the following::
+
+ | Traceback (most recent call last):
+ | File "foobar.py", line 42, in arbitrary_example
+ | foo.baz()
+ | ArbitraryError: [Errno 42] Something bad happened!
+ """
+ output = "".join(traceback.format_exception(*sys.exc_info()))
+
+ exc_lines = []
+ for line in output.split('\n'):
+ exc_lines.append(prefix + line)
+
+ # The last line is always empty, omit it
+ return "\n".join(exc_lines[:-1])
diff --git a/python/qemu/machine/README.rst b/python/qemu/machine/README.rst
new file mode 100644
index 000000000..8de2c3d77
--- /dev/null
+++ b/python/qemu/machine/README.rst
@@ -0,0 +1,9 @@
+qemu.machine package
+====================
+
+This package provides core utilities used for testing and debugging
+QEMU. It is used by the iotests, vm tests, avocado tests, and several
+other utilities in the ./scripts directory. It is not a fully-fledged
+SDK and it is subject to change at any time.
+
+See the documentation in ``__init__.py`` for more information.
diff --git a/python/qemu/machine/__init__.py b/python/qemu/machine/__init__.py
new file mode 100644
index 000000000..9ccd58ef1
--- /dev/null
+++ b/python/qemu/machine/__init__.py
@@ -0,0 +1,36 @@
+"""
+QEMU development and testing library.
+
+This library provides a few high-level classes for driving QEMU from a
+test suite, not intended for production use.
+
+ | QEMUQtestProtocol: send/receive qtest messages.
+ | QEMUMachine: Configure and Boot a QEMU VM
+ | +-- QEMUQtestMachine: VM class, with a qtest socket.
+
+"""
+
+# Copyright (C) 2020-2021 John Snow for Red Hat Inc.
+# Copyright (C) 2015-2016 Red Hat Inc.
+# Copyright (C) 2012 IBM Corp.
+#
+# Authors:
+# John Snow <jsnow@redhat.com>
+# Fam Zheng <fam@euphon.net>
+#
+# This work is licensed under the terms of the GNU GPL, version 2. See
+# the COPYING file in the top-level directory.
+#
+
+# pylint: disable=import-error
+# see: https://github.com/PyCQA/pylint/issues/3624
+# see: https://github.com/PyCQA/pylint/issues/3651
+from .machine import QEMUMachine
+from .qtest import QEMUQtestMachine, QEMUQtestProtocol
+
+
+__all__ = (
+ 'QEMUMachine',
+ 'QEMUQtestProtocol',
+ 'QEMUQtestMachine',
+)
diff --git a/python/qemu/machine/console_socket.py b/python/qemu/machine/console_socket.py
new file mode 100644
index 000000000..8c4ff598a
--- /dev/null
+++ b/python/qemu/machine/console_socket.py
@@ -0,0 +1,129 @@
+"""
+QEMU Console Socket Module:
+
+This python module implements a ConsoleSocket object,
+which can drain a socket and optionally dump the bytes to file.
+"""
+# Copyright 2020 Linaro
+#
+# Authors:
+# Robert Foley <robert.foley@linaro.org>
+#
+# This code is licensed under the GPL version 2 or later. See
+# the COPYING file in the top-level directory.
+#
+
+from collections import deque
+import socket
+import threading
+import time
+from typing import Deque, Optional
+
+
+class ConsoleSocket(socket.socket):
+ """
+ ConsoleSocket represents a socket attached to a char device.
+
+ Optionally (if drain==True), drains the socket and places the bytes
+ into an in memory buffer for later processing.
+
+ Optionally a file path can be passed in and we will also
+ dump the characters to this file for debugging purposes.
+ """
+ def __init__(self, address: str, file: Optional[str] = None,
+ drain: bool = False):
+ self._recv_timeout_sec = 300.0
+ self._sleep_time = 0.5
+ self._buffer: Deque[int] = deque()
+ socket.socket.__init__(self, socket.AF_UNIX, socket.SOCK_STREAM)
+ self.connect(address)
+ self._logfile = None
+ if file:
+ # pylint: disable=consider-using-with
+ self._logfile = open(file, "bw")
+ self._open = True
+ self._drain_thread = None
+ if drain:
+ self._drain_thread = self._thread_start()
+
+ def __repr__(self) -> str:
+ tmp = super().__repr__()
+ tmp = tmp.rstrip(">")
+ tmp = "%s, logfile=%s, drain_thread=%s>" % (tmp, self._logfile,
+ self._drain_thread)
+ return tmp
+
+ def _drain_fn(self) -> None:
+ """Drains the socket and runs while the socket is open."""
+ while self._open:
+ try:
+ self._drain_socket()
+ except socket.timeout:
+ # The socket is expected to timeout since we set a
+ # short timeout to allow the thread to exit when
+ # self._open is set to False.
+ time.sleep(self._sleep_time)
+
+ def _thread_start(self) -> threading.Thread:
+ """Kick off a thread to drain the socket."""
+ # Configure socket to not block and timeout.
+ # This allows our drain thread to not block
+ # on recieve and exit smoothly.
+ socket.socket.setblocking(self, False)
+ socket.socket.settimeout(self, 1)
+ drain_thread = threading.Thread(target=self._drain_fn)
+ drain_thread.daemon = True
+ drain_thread.start()
+ return drain_thread
+
+ def close(self) -> None:
+ """Close the base object and wait for the thread to terminate"""
+ if self._open:
+ self._open = False
+ if self._drain_thread is not None:
+ thread, self._drain_thread = self._drain_thread, None
+ thread.join()
+ socket.socket.close(self)
+ if self._logfile:
+ self._logfile.close()
+ self._logfile = None
+
+ def _drain_socket(self) -> None:
+ """process arriving characters into in memory _buffer"""
+ data = socket.socket.recv(self, 1)
+ if self._logfile:
+ self._logfile.write(data)
+ self._logfile.flush()
+ self._buffer.extend(data)
+
+ def recv(self, bufsize: int = 1, flags: int = 0) -> bytes:
+ """Return chars from in memory buffer.
+ Maintains the same API as socket.socket.recv.
+ """
+ if self._drain_thread is None:
+ # Not buffering the socket, pass thru to socket.
+ return socket.socket.recv(self, bufsize, flags)
+ assert not flags, "Cannot pass flags to recv() in drained mode"
+ start_time = time.time()
+ while len(self._buffer) < bufsize:
+ time.sleep(self._sleep_time)
+ elapsed_sec = time.time() - start_time
+ if elapsed_sec > self._recv_timeout_sec:
+ raise socket.timeout
+ return bytes((self._buffer.popleft() for i in range(bufsize)))
+
+ def setblocking(self, value: bool) -> None:
+ """When not draining we pass thru to the socket,
+ since when draining we control socket blocking.
+ """
+ if self._drain_thread is None:
+ socket.socket.setblocking(self, value)
+
+ def settimeout(self, value: Optional[float]) -> None:
+ """When not draining we pass thru to the socket,
+ since when draining we control the timeout.
+ """
+ if value is not None:
+ self._recv_timeout_sec = value
+ if self._drain_thread is None:
+ socket.socket.settimeout(self, value)
diff --git a/python/qemu/machine/machine.py b/python/qemu/machine/machine.py
new file mode 100644
index 000000000..67ab06ca2
--- /dev/null
+++ b/python/qemu/machine/machine.py
@@ -0,0 +1,837 @@
+"""
+QEMU machine module:
+
+The machine module primarily provides the QEMUMachine class,
+which provides facilities for managing the lifetime of a QEMU VM.
+"""
+
+# Copyright (C) 2015-2016 Red Hat Inc.
+# Copyright (C) 2012 IBM Corp.
+#
+# Authors:
+# Fam Zheng <famz@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2. See
+# the COPYING file in the top-level directory.
+#
+# Based on qmp.py.
+#
+
+import errno
+from itertools import chain
+import locale
+import logging
+import os
+import shutil
+import signal
+import socket
+import subprocess
+import tempfile
+from types import TracebackType
+from typing import (
+ Any,
+ BinaryIO,
+ Dict,
+ List,
+ Optional,
+ Sequence,
+ Tuple,
+ Type,
+ TypeVar,
+)
+
+from qemu.qmp import ( # pylint: disable=import-error
+ QMPMessage,
+ QMPReturnValue,
+ SocketAddrT,
+)
+
+from . import console_socket
+
+
+if os.environ.get('QEMU_PYTHON_LEGACY_QMP'):
+ from qemu.qmp import QEMUMonitorProtocol
+else:
+ from qemu.aqmp.legacy import QEMUMonitorProtocol
+
+
+LOG = logging.getLogger(__name__)
+
+
+class QEMUMachineError(Exception):
+ """
+ Exception called when an error in QEMUMachine happens.
+ """
+
+
+class QEMUMachineAddDeviceError(QEMUMachineError):
+ """
+ Exception raised when a request to add a device can not be fulfilled
+
+ The failures are caused by limitations, lack of information or conflicting
+ requests on the QEMUMachine methods. This exception does not represent
+ failures reported by the QEMU binary itself.
+ """
+
+
+class AbnormalShutdown(QEMUMachineError):
+ """
+ Exception raised when a graceful shutdown was requested, but not performed.
+ """
+
+
+_T = TypeVar('_T', bound='QEMUMachine')
+
+
+class QEMUMachine:
+ """
+ A QEMU VM.
+
+ Use this object as a context manager to ensure
+ the QEMU process terminates::
+
+ with VM(binary) as vm:
+ ...
+ # vm is guaranteed to be shut down here
+ """
+ # pylint: disable=too-many-instance-attributes, too-many-public-methods
+
+ def __init__(self,
+ binary: str,
+ args: Sequence[str] = (),
+ wrapper: Sequence[str] = (),
+ name: Optional[str] = None,
+ base_temp_dir: str = "/var/tmp",
+ monitor_address: Optional[SocketAddrT] = None,
+ sock_dir: Optional[str] = None,
+ drain_console: bool = False,
+ console_log: Optional[str] = None,
+ log_dir: Optional[str] = None,
+ qmp_timer: Optional[float] = None):
+ '''
+ Initialize a QEMUMachine
+
+ @param binary: path to the qemu binary
+ @param args: list of extra arguments
+ @param wrapper: list of arguments used as prefix to qemu binary
+ @param name: prefix for socket and log file names (default: qemu-PID)
+ @param base_temp_dir: default location where temp files are created
+ @param monitor_address: address for QMP monitor
+ @param sock_dir: where to create socket (defaults to base_temp_dir)
+ @param drain_console: (optional) True to drain console socket to buffer
+ @param console_log: (optional) path to console log file
+ @param log_dir: where to create and keep log files
+ @param qmp_timer: (optional) default QMP socket timeout
+ @note: Qemu process is not started until launch() is used.
+ '''
+ # pylint: disable=too-many-arguments
+
+ # Direct user configuration
+
+ self._binary = binary
+ self._args = list(args)
+ self._wrapper = wrapper
+ self._qmp_timer = qmp_timer
+
+ self._name = name or f"qemu-{os.getpid()}-{id(self):02x}"
+ self._temp_dir: Optional[str] = None
+ self._base_temp_dir = base_temp_dir
+ self._sock_dir = sock_dir
+ self._log_dir = log_dir
+
+ if monitor_address is not None:
+ self._monitor_address = monitor_address
+ else:
+ self._monitor_address = os.path.join(
+ self.sock_dir, f"{self._name}-monitor.sock"
+ )
+
+ self._console_log_path = console_log
+ if self._console_log_path:
+ # In order to log the console, buffering needs to be enabled.
+ self._drain_console = True
+ else:
+ self._drain_console = drain_console
+
+ # Runstate
+ self._qemu_log_path: Optional[str] = None
+ self._qemu_log_file: Optional[BinaryIO] = None
+ self._popen: Optional['subprocess.Popen[bytes]'] = None
+ self._events: List[QMPMessage] = []
+ self._iolog: Optional[str] = None
+ self._qmp_set = True # Enable QMP monitor by default.
+ self._qmp_connection: Optional[QEMUMonitorProtocol] = None
+ self._qemu_full_args: Tuple[str, ...] = ()
+ self._launched = False
+ self._machine: Optional[str] = None
+ self._console_index = 0
+ self._console_set = False
+ self._console_device_type: Optional[str] = None
+ self._console_address = os.path.join(
+ self.sock_dir, f"{self._name}-console.sock"
+ )
+ self._console_socket: Optional[socket.socket] = None
+ self._remove_files: List[str] = []
+ self._user_killed = False
+ self._quit_issued = False
+
+ def __enter__(self: _T) -> _T:
+ return self
+
+ def __exit__(self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType]) -> None:
+ self.shutdown()
+
+ def add_monitor_null(self) -> None:
+ """
+ This can be used to add an unused monitor instance.
+ """
+ self._args.append('-monitor')
+ self._args.append('null')
+
+ def add_fd(self: _T, fd: int, fdset: int,
+ opaque: str, opts: str = '') -> _T:
+ """
+ Pass a file descriptor to the VM
+ """
+ options = ['fd=%d' % fd,
+ 'set=%d' % fdset,
+ 'opaque=%s' % opaque]
+ if opts:
+ options.append(opts)
+
+ # This did not exist before 3.4, but since then it is
+ # mandatory for our purpose
+ if hasattr(os, 'set_inheritable'):
+ os.set_inheritable(fd, True)
+
+ self._args.append('-add-fd')
+ self._args.append(','.join(options))
+ return self
+
+ def send_fd_scm(self, fd: Optional[int] = None,
+ file_path: Optional[str] = None) -> int:
+ """
+ Send an fd or file_path to the remote via SCM_RIGHTS.
+
+ Exactly one of fd and file_path must be given. If it is
+ file_path, the file will be opened read-only and the new file
+ descriptor will be sent to the remote.
+ """
+ if file_path is not None:
+ assert fd is None
+ with open(file_path, "rb") as passfile:
+ fd = passfile.fileno()
+ self._qmp.send_fd_scm(fd)
+ else:
+ assert fd is not None
+ self._qmp.send_fd_scm(fd)
+
+ return 0
+
+ @staticmethod
+ def _remove_if_exists(path: str) -> None:
+ """
+ Remove file object at path if it exists
+ """
+ try:
+ os.remove(path)
+ except OSError as exception:
+ if exception.errno == errno.ENOENT:
+ return
+ raise
+
+ def is_running(self) -> bool:
+ """Returns true if the VM is running."""
+ return self._popen is not None and self._popen.poll() is None
+
+ @property
+ def _subp(self) -> 'subprocess.Popen[bytes]':
+ if self._popen is None:
+ raise QEMUMachineError('Subprocess pipe not present')
+ return self._popen
+
+ def exitcode(self) -> Optional[int]:
+ """Returns the exit code if possible, or None."""
+ if self._popen is None:
+ return None
+ return self._popen.poll()
+
+ def get_pid(self) -> Optional[int]:
+ """Returns the PID of the running process, or None."""
+ if not self.is_running():
+ return None
+ return self._subp.pid
+
+ def _load_io_log(self) -> None:
+ # Assume that the output encoding of QEMU's terminal output is
+ # defined by our locale. If indeterminate, allow open() to fall
+ # back to the platform default.
+ _, encoding = locale.getlocale()
+ if self._qemu_log_path is not None:
+ with open(self._qemu_log_path, "r", encoding=encoding) as iolog:
+ self._iolog = iolog.read()
+
+ @property
+ def _base_args(self) -> List[str]:
+ args = ['-display', 'none', '-vga', 'none']
+
+ if self._qmp_set:
+ if isinstance(self._monitor_address, tuple):
+ moncdev = "socket,id=mon,host={},port={}".format(
+ *self._monitor_address
+ )
+ else:
+ moncdev = f"socket,id=mon,path={self._monitor_address}"
+ args.extend(['-chardev', moncdev, '-mon',
+ 'chardev=mon,mode=control'])
+
+ if self._machine is not None:
+ args.extend(['-machine', self._machine])
+ for _ in range(self._console_index):
+ args.extend(['-serial', 'null'])
+ if self._console_set:
+ chardev = ('socket,id=console,path=%s,server=on,wait=off' %
+ self._console_address)
+ args.extend(['-chardev', chardev])
+ if self._console_device_type is None:
+ args.extend(['-serial', 'chardev:console'])
+ else:
+ device = '%s,chardev=console' % self._console_device_type
+ args.extend(['-device', device])
+ return args
+
+ @property
+ def args(self) -> List[str]:
+ """Returns the list of arguments given to the QEMU binary."""
+ return self._args
+
+ def _pre_launch(self) -> None:
+ if self._console_set:
+ self._remove_files.append(self._console_address)
+
+ if self._qmp_set:
+ if isinstance(self._monitor_address, str):
+ self._remove_files.append(self._monitor_address)
+ self._qmp_connection = QEMUMonitorProtocol(
+ self._monitor_address,
+ server=True,
+ nickname=self._name
+ )
+
+ # NOTE: Make sure any opened resources are *definitely* freed in
+ # _post_shutdown()!
+ # pylint: disable=consider-using-with
+ self._qemu_log_path = os.path.join(self.log_dir, self._name + ".log")
+ self._qemu_log_file = open(self._qemu_log_path, 'wb')
+
+ self._iolog = None
+ self._qemu_full_args = tuple(chain(
+ self._wrapper,
+ [self._binary],
+ self._base_args,
+ self._args
+ ))
+
+ def _post_launch(self) -> None:
+ if self._qmp_connection:
+ self._qmp.accept(self._qmp_timer)
+
+ def _close_qemu_log_file(self) -> None:
+ if self._qemu_log_file is not None:
+ self._qemu_log_file.close()
+ self._qemu_log_file = None
+
+ def _post_shutdown(self) -> None:
+ """
+ Called to cleanup the VM instance after the process has exited.
+ May also be called after a failed launch.
+ """
+ try:
+ self._close_qmp_connection()
+ except Exception as err: # pylint: disable=broad-except
+ LOG.warning(
+ "Exception closing QMP connection: %s",
+ str(err) if str(err) else type(err).__name__
+ )
+ finally:
+ assert self._qmp_connection is None
+
+ self._close_qemu_log_file()
+
+ self._load_io_log()
+
+ self._qemu_log_path = None
+
+ if self._temp_dir is not None:
+ shutil.rmtree(self._temp_dir)
+ self._temp_dir = None
+
+ while len(self._remove_files) > 0:
+ self._remove_if_exists(self._remove_files.pop())
+
+ exitcode = self.exitcode()
+ if (exitcode is not None and exitcode < 0
+ and not (self._user_killed and exitcode == -signal.SIGKILL)):
+ msg = 'qemu received signal %i; command: "%s"'
+ if self._qemu_full_args:
+ command = ' '.join(self._qemu_full_args)
+ else:
+ command = ''
+ LOG.warning(msg, -int(exitcode), command)
+
+ self._quit_issued = False
+ self._user_killed = False
+ self._launched = False
+
+ def launch(self) -> None:
+ """
+ Launch the VM and make sure we cleanup and expose the
+ command line/output in case of exception
+ """
+
+ if self._launched:
+ raise QEMUMachineError('VM already launched')
+
+ try:
+ self._launch()
+ except:
+ # We may have launched the process but it may
+ # have exited before we could connect via QMP.
+ # Assume the VM didn't launch or is exiting.
+ # If we don't wait for the process, exitcode() may still be
+ # 'None' by the time control is ceded back to the caller.
+ if self._launched:
+ self.wait()
+ else:
+ self._post_shutdown()
+
+ LOG.debug('Error launching VM')
+ if self._qemu_full_args:
+ LOG.debug('Command: %r', ' '.join(self._qemu_full_args))
+ if self._iolog:
+ LOG.debug('Output: %r', self._iolog)
+ raise
+
+ def _launch(self) -> None:
+ """
+ Launch the VM and establish a QMP connection
+ """
+ self._pre_launch()
+ LOG.debug('VM launch command: %r', ' '.join(self._qemu_full_args))
+
+ # Cleaning up of this subprocess is guaranteed by _do_shutdown.
+ # pylint: disable=consider-using-with
+ self._popen = subprocess.Popen(self._qemu_full_args,
+ stdin=subprocess.DEVNULL,
+ stdout=self._qemu_log_file,
+ stderr=subprocess.STDOUT,
+ shell=False,
+ close_fds=False)
+ self._launched = True
+ self._post_launch()
+
+ def _close_qmp_connection(self) -> None:
+ """
+ Close the underlying QMP connection, if any.
+
+ Dutifully report errors that occurred while closing, but assume
+ that any error encountered indicates an abnormal termination
+ process and not a failure to close.
+ """
+ if self._qmp_connection is None:
+ return
+
+ try:
+ self._qmp.close()
+ except EOFError:
+ # EOF can occur as an Exception here when using the Async
+ # QMP backend. It indicates that the server closed the
+ # stream. If we successfully issued 'quit' at any point,
+ # then this was expected. If the remote went away without
+ # our permission, it's worth reporting that as an abnormal
+ # shutdown case.
+ if not (self._user_killed or self._quit_issued):
+ raise
+ finally:
+ self._qmp_connection = None
+
+ def _early_cleanup(self) -> None:
+ """
+ Perform any cleanup that needs to happen before the VM exits.
+
+ This method may be called twice upon shutdown, once each by soft
+ and hard shutdown in failover scenarios.
+ """
+ # If we keep the console socket open, we may deadlock waiting
+ # for QEMU to exit, while QEMU is waiting for the socket to
+ # become writeable.
+ if self._console_socket is not None:
+ self._console_socket.close()
+ self._console_socket = None
+
+ def _hard_shutdown(self) -> None:
+ """
+ Perform early cleanup, kill the VM, and wait for it to terminate.
+
+ :raise subprocess.Timeout: When timeout is exceeds 60 seconds
+ waiting for the QEMU process to terminate.
+ """
+ self._early_cleanup()
+ self._subp.kill()
+ self._subp.wait(timeout=60)
+
+ def _soft_shutdown(self, timeout: Optional[int]) -> None:
+ """
+ Perform early cleanup, attempt to gracefully shut down the VM, and wait
+ for it to terminate.
+
+ :param timeout: Timeout in seconds for graceful shutdown.
+ A value of None is an infinite wait.
+
+ :raise ConnectionReset: On QMP communication errors
+ :raise subprocess.TimeoutExpired: When timeout is exceeded waiting for
+ the QEMU process to terminate.
+ """
+ self._early_cleanup()
+
+ if self._qmp_connection:
+ try:
+ if not self._quit_issued:
+ # May raise ExecInterruptedError or StateError if the
+ # connection dies or has *already* died.
+ self.qmp('quit')
+ finally:
+ # Regardless, we want to quiesce the connection.
+ self._close_qmp_connection()
+
+ # May raise subprocess.TimeoutExpired
+ self._subp.wait(timeout=timeout)
+
+ def _do_shutdown(self, timeout: Optional[int]) -> None:
+ """
+ Attempt to shutdown the VM gracefully; fallback to a hard shutdown.
+
+ :param timeout: Timeout in seconds for graceful shutdown.
+ A value of None is an infinite wait.
+
+ :raise AbnormalShutdown: When the VM could not be shut down gracefully.
+ The inner exception will likely be ConnectionReset or
+ subprocess.TimeoutExpired. In rare cases, non-graceful termination
+ may result in its own exceptions, likely subprocess.TimeoutExpired.
+ """
+ try:
+ self._soft_shutdown(timeout)
+ except Exception as exc:
+ self._hard_shutdown()
+ raise AbnormalShutdown("Could not perform graceful shutdown") \
+ from exc
+
+ def shutdown(self,
+ hard: bool = False,
+ timeout: Optional[int] = 30) -> None:
+ """
+ Terminate the VM (gracefully if possible) and perform cleanup.
+ Cleanup will always be performed.
+
+ If the VM has not yet been launched, or shutdown(), wait(), or kill()
+ have already been called, this method does nothing.
+
+ :param hard: When true, do not attempt graceful shutdown, and
+ suppress the SIGKILL warning log message.
+ :param timeout: Optional timeout in seconds for graceful shutdown.
+ Default 30 seconds, A `None` value is an infinite wait.
+ """
+ if not self._launched:
+ return
+
+ try:
+ if hard:
+ self._user_killed = True
+ self._hard_shutdown()
+ else:
+ self._do_shutdown(timeout)
+ finally:
+ self._post_shutdown()
+
+ def kill(self) -> None:
+ """
+ Terminate the VM forcefully, wait for it to exit, and perform cleanup.
+ """
+ self.shutdown(hard=True)
+
+ def wait(self, timeout: Optional[int] = 30) -> None:
+ """
+ Wait for the VM to power off and perform post-shutdown cleanup.
+
+ :param timeout: Optional timeout in seconds. Default 30 seconds.
+ A value of `None` is an infinite wait.
+ """
+ self._quit_issued = True
+ self.shutdown(timeout=timeout)
+
+ def set_qmp_monitor(self, enabled: bool = True) -> None:
+ """
+ Set the QMP monitor.
+
+ @param enabled: if False, qmp monitor options will be removed from
+ the base arguments of the resulting QEMU command
+ line. Default is True.
+
+ .. note:: Call this function before launch().
+ """
+ self._qmp_set = enabled
+
+ @property
+ def _qmp(self) -> QEMUMonitorProtocol:
+ if self._qmp_connection is None:
+ raise QEMUMachineError("Attempt to access QMP with no connection")
+ return self._qmp_connection
+
+ @classmethod
+ def _qmp_args(cls, conv_keys: bool,
+ args: Dict[str, Any]) -> Dict[str, object]:
+ if conv_keys:
+ return {k.replace('_', '-'): v for k, v in args.items()}
+
+ return args
+
+ def qmp(self, cmd: str,
+ args_dict: Optional[Dict[str, object]] = None,
+ conv_keys: Optional[bool] = None,
+ **args: Any) -> QMPMessage:
+ """
+ Invoke a QMP command and return the response dict
+ """
+ if args_dict is not None:
+ assert not args
+ assert conv_keys is None
+ args = args_dict
+ conv_keys = False
+
+ if conv_keys is None:
+ conv_keys = True
+
+ qmp_args = self._qmp_args(conv_keys, args)
+ ret = self._qmp.cmd(cmd, args=qmp_args)
+ if cmd == 'quit' and 'error' not in ret and 'return' in ret:
+ self._quit_issued = True
+ return ret
+
+ def command(self, cmd: str,
+ conv_keys: bool = True,
+ **args: Any) -> QMPReturnValue:
+ """
+ Invoke a QMP command.
+ On success return the response dict.
+ On failure raise an exception.
+ """
+ qmp_args = self._qmp_args(conv_keys, args)
+ ret = self._qmp.command(cmd, **qmp_args)
+ if cmd == 'quit':
+ self._quit_issued = True
+ return ret
+
+ def get_qmp_event(self, wait: bool = False) -> Optional[QMPMessage]:
+ """
+ Poll for one queued QMP events and return it
+ """
+ if self._events:
+ return self._events.pop(0)
+ return self._qmp.pull_event(wait=wait)
+
+ def get_qmp_events(self, wait: bool = False) -> List[QMPMessage]:
+ """
+ Poll for queued QMP events and return a list of dicts
+ """
+ events = self._qmp.get_events(wait=wait)
+ events.extend(self._events)
+ del self._events[:]
+ return events
+
+ @staticmethod
+ def event_match(event: Any, match: Optional[Any]) -> bool:
+ """
+ Check if an event matches optional match criteria.
+
+ The match criteria takes the form of a matching subdict. The event is
+ checked to be a superset of the subdict, recursively, with matching
+ values whenever the subdict values are not None.
+
+ This has a limitation that you cannot explicitly check for None values.
+
+ Examples, with the subdict queries on the left:
+ - None matches any object.
+ - {"foo": None} matches {"foo": {"bar": 1}}
+ - {"foo": None} matches {"foo": 5}
+ - {"foo": {"abc": None}} does not match {"foo": {"bar": 1}}
+ - {"foo": {"rab": 2}} matches {"foo": {"bar": 1, "rab": 2}}
+ """
+ if match is None:
+ return True
+
+ try:
+ for key in match:
+ if key in event:
+ if not QEMUMachine.event_match(event[key], match[key]):
+ return False
+ else:
+ return False
+ return True
+ except TypeError:
+ # either match or event wasn't iterable (not a dict)
+ return bool(match == event)
+
+ def event_wait(self, name: str,
+ timeout: float = 60.0,
+ match: Optional[QMPMessage] = None) -> Optional[QMPMessage]:
+ """
+ event_wait waits for and returns a named event from QMP with a timeout.
+
+ name: The event to wait for.
+ timeout: QEMUMonitorProtocol.pull_event timeout parameter.
+ match: Optional match criteria. See event_match for details.
+ """
+ return self.events_wait([(name, match)], timeout)
+
+ def events_wait(self,
+ events: Sequence[Tuple[str, Any]],
+ timeout: float = 60.0) -> Optional[QMPMessage]:
+ """
+ events_wait waits for and returns a single named event from QMP.
+ In the case of multiple qualifying events, this function returns the
+ first one.
+
+ :param events: A sequence of (name, match_criteria) tuples.
+ The match criteria are optional and may be None.
+ See event_match for details.
+ :param timeout: Optional timeout, in seconds.
+ See QEMUMonitorProtocol.pull_event.
+
+ :raise QMPTimeoutError: If timeout was non-zero and no matching events
+ were found.
+ :return: A QMP event matching the filter criteria.
+ If timeout was 0 and no event matched, None.
+ """
+ def _match(event: QMPMessage) -> bool:
+ for name, match in events:
+ if event['event'] == name and self.event_match(event, match):
+ return True
+ return False
+
+ event: Optional[QMPMessage]
+
+ # Search cached events
+ for event in self._events:
+ if _match(event):
+ self._events.remove(event)
+ return event
+
+ # Poll for new events
+ while True:
+ event = self._qmp.pull_event(wait=timeout)
+ if event is None:
+ # NB: None is only returned when timeout is false-ish.
+ # Timeouts raise QMPTimeoutError instead!
+ break
+ if _match(event):
+ return event
+ self._events.append(event)
+
+ return None
+
+ def get_log(self) -> Optional[str]:
+ """
+ After self.shutdown or failed qemu execution, this returns the output
+ of the qemu process.
+ """
+ return self._iolog
+
+ def add_args(self, *args: str) -> None:
+ """
+ Adds to the list of extra arguments to be given to the QEMU binary
+ """
+ self._args.extend(args)
+
+ def set_machine(self, machine_type: str) -> None:
+ """
+ Sets the machine type
+
+ If set, the machine type will be added to the base arguments
+ of the resulting QEMU command line.
+ """
+ self._machine = machine_type
+
+ def set_console(self,
+ device_type: Optional[str] = None,
+ console_index: int = 0) -> None:
+ """
+ Sets the device type for a console device
+
+ If set, the console device and a backing character device will
+ be added to the base arguments of the resulting QEMU command
+ line.
+
+ This is a convenience method that will either use the provided
+ device type, or default to a "-serial chardev:console" command
+ line argument.
+
+ The actual setting of command line arguments will be be done at
+ machine launch time, as it depends on the temporary directory
+ to be created.
+
+ @param device_type: the device type, such as "isa-serial". If
+ None is given (the default value) a "-serial
+ chardev:console" command line argument will
+ be used instead, resorting to the machine's
+ default device type.
+ @param console_index: the index of the console device to use.
+ If not zero, the command line will create
+ 'index - 1' consoles and connect them to
+ the 'null' backing character device.
+ """
+ self._console_set = True
+ self._console_device_type = device_type
+ self._console_index = console_index
+
+ @property
+ def console_socket(self) -> socket.socket:
+ """
+ Returns a socket connected to the console
+ """
+ if self._console_socket is None:
+ self._console_socket = console_socket.ConsoleSocket(
+ self._console_address,
+ file=self._console_log_path,
+ drain=self._drain_console)
+ return self._console_socket
+
+ @property
+ def temp_dir(self) -> str:
+ """
+ Returns a temporary directory to be used for this machine
+ """
+ if self._temp_dir is None:
+ self._temp_dir = tempfile.mkdtemp(prefix="qemu-machine-",
+ dir=self._base_temp_dir)
+ return self._temp_dir
+
+ @property
+ def sock_dir(self) -> str:
+ """
+ Returns the directory used for sockfiles by this machine.
+ """
+ if self._sock_dir:
+ return self._sock_dir
+ return self.temp_dir
+
+ @property
+ def log_dir(self) -> str:
+ """
+ Returns a directory to be used for writing logs
+ """
+ if self._log_dir is None:
+ return self.temp_dir
+ return self._log_dir
diff --git a/python/qemu/machine/py.typed b/python/qemu/machine/py.typed
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/python/qemu/machine/py.typed
diff --git a/python/qemu/machine/qtest.py b/python/qemu/machine/qtest.py
new file mode 100644
index 000000000..f2f9aaa5e
--- /dev/null
+++ b/python/qemu/machine/qtest.py
@@ -0,0 +1,163 @@
+"""
+QEMU qtest library
+
+qtest offers the QEMUQtestProtocol and QEMUQTestMachine classes, which
+offer a connection to QEMU's qtest protocol socket, and a qtest-enabled
+subclass of QEMUMachine, respectively.
+"""
+
+# Copyright (C) 2015 Red Hat Inc.
+#
+# Authors:
+# Fam Zheng <famz@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2. See
+# the COPYING file in the top-level directory.
+#
+# Based on qmp.py.
+#
+
+import os
+import socket
+from typing import (
+ List,
+ Optional,
+ Sequence,
+ TextIO,
+)
+
+from qemu.qmp import SocketAddrT # pylint: disable=import-error
+
+from .machine import QEMUMachine
+
+
+class QEMUQtestProtocol:
+ """
+ QEMUQtestProtocol implements a connection to a qtest socket.
+
+ :param address: QEMU address, can be either a unix socket path (string)
+ or a tuple in the form ( address, port ) for a TCP
+ connection
+ :param server: server mode, listens on the socket (bool)
+ :raise socket.error: on socket connection errors
+
+ .. note::
+ No conection is estabalished by __init__(), this is done
+ by the connect() or accept() methods.
+ """
+ def __init__(self, address: SocketAddrT,
+ server: bool = False):
+ self._address = address
+ self._sock = self._get_sock()
+ self._sockfile: Optional[TextIO] = None
+ if server:
+ self._sock.bind(self._address)
+ self._sock.listen(1)
+
+ def _get_sock(self) -> socket.socket:
+ if isinstance(self._address, tuple):
+ family = socket.AF_INET
+ else:
+ family = socket.AF_UNIX
+ return socket.socket(family, socket.SOCK_STREAM)
+
+ def connect(self) -> None:
+ """
+ Connect to the qtest socket.
+
+ @raise socket.error on socket connection errors
+ """
+ self._sock.connect(self._address)
+ self._sockfile = self._sock.makefile(mode='r')
+
+ def accept(self) -> None:
+ """
+ Await connection from QEMU.
+
+ @raise socket.error on socket connection errors
+ """
+ self._sock, _ = self._sock.accept()
+ self._sockfile = self._sock.makefile(mode='r')
+
+ def cmd(self, qtest_cmd: str) -> str:
+ """
+ Send a qtest command on the wire.
+
+ @param qtest_cmd: qtest command text to be sent
+ """
+ assert self._sockfile is not None
+ self._sock.sendall((qtest_cmd + "\n").encode('utf-8'))
+ resp = self._sockfile.readline()
+ return resp
+
+ def close(self) -> None:
+ """
+ Close this socket.
+ """
+ self._sock.close()
+ if self._sockfile:
+ self._sockfile.close()
+ self._sockfile = None
+
+ def settimeout(self, timeout: Optional[float]) -> None:
+ """Set a timeout, in seconds."""
+ self._sock.settimeout(timeout)
+
+
+class QEMUQtestMachine(QEMUMachine):
+ """
+ A QEMU VM, with a qtest socket available.
+ """
+
+ def __init__(self,
+ binary: str,
+ args: Sequence[str] = (),
+ wrapper: Sequence[str] = (),
+ name: Optional[str] = None,
+ base_temp_dir: str = "/var/tmp",
+ sock_dir: Optional[str] = None,
+ qmp_timer: Optional[float] = None):
+ # pylint: disable=too-many-arguments
+
+ if name is None:
+ name = "qemu-%d" % os.getpid()
+ if sock_dir is None:
+ sock_dir = base_temp_dir
+ super().__init__(binary, args, wrapper=wrapper, name=name,
+ base_temp_dir=base_temp_dir,
+ sock_dir=sock_dir, qmp_timer=qmp_timer)
+ self._qtest: Optional[QEMUQtestProtocol] = None
+ self._qtest_path = os.path.join(sock_dir, name + "-qtest.sock")
+
+ @property
+ def _base_args(self) -> List[str]:
+ args = super()._base_args
+ args.extend([
+ '-qtest', f"unix:path={self._qtest_path}",
+ '-accel', 'qtest'
+ ])
+ return args
+
+ def _pre_launch(self) -> None:
+ super()._pre_launch()
+ self._qtest = QEMUQtestProtocol(self._qtest_path, server=True)
+
+ def _post_launch(self) -> None:
+ assert self._qtest is not None
+ super()._post_launch()
+ self._qtest.accept()
+
+ def _post_shutdown(self) -> None:
+ super()._post_shutdown()
+ self._remove_if_exists(self._qtest_path)
+
+ def qtest(self, cmd: str) -> str:
+ """
+ Send a qtest command to the guest.
+
+ :param cmd: qtest command to send
+ :return: qtest server response
+ """
+ if self._qtest is None:
+ raise RuntimeError("qtest socket not available")
+ return self._qtest.cmd(cmd)
diff --git a/python/qemu/qmp/README.rst b/python/qemu/qmp/README.rst
new file mode 100644
index 000000000..5bfb82535
--- /dev/null
+++ b/python/qemu/qmp/README.rst
@@ -0,0 +1,9 @@
+qemu.qmp package
+================
+
+This package provides a library used for connecting to and communicating
+with QMP servers. It is used extensively by iotests, vm tests,
+avocado tests, and other utilities in the ./scripts directory. It is
+not a fully-fledged SDK and is subject to change at any time.
+
+See the documentation in ``__init__.py`` for more information.
diff --git a/python/qemu/qmp/__init__.py b/python/qemu/qmp/__init__.py
new file mode 100644
index 000000000..358c0971d
--- /dev/null
+++ b/python/qemu/qmp/__init__.py
@@ -0,0 +1,422 @@
+"""
+QEMU Monitor Protocol (QMP) development library & tooling.
+
+This package provides a fairly low-level class for communicating to QMP
+protocol servers, as implemented by QEMU, the QEMU Guest Agent, and the
+QEMU Storage Daemon. This library is not intended for production use.
+
+`QEMUMonitorProtocol` is the primary class of interest, and all errors
+raised derive from `QMPError`.
+"""
+
+# Copyright (C) 2009, 2010 Red Hat Inc.
+#
+# Authors:
+# Luiz Capitulino <lcapitulino@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2. See
+# the COPYING file in the top-level directory.
+
+import errno
+import json
+import logging
+import socket
+import struct
+from types import TracebackType
+from typing import (
+ Any,
+ Dict,
+ List,
+ Optional,
+ TextIO,
+ Tuple,
+ Type,
+ TypeVar,
+ Union,
+ cast,
+)
+
+
+#: QMPMessage is an entire QMP message of any kind.
+QMPMessage = Dict[str, Any]
+
+#: QMPReturnValue is the 'return' value of a command.
+QMPReturnValue = object
+
+#: QMPObject is any object in a QMP message.
+QMPObject = Dict[str, object]
+
+# QMPMessage can be outgoing commands or incoming events/returns.
+# QMPReturnValue is usually a dict/json object, but due to QAPI's
+# 'returns-whitelist', it can actually be anything.
+#
+# {'return': {}} is a QMPMessage,
+# {} is the QMPReturnValue.
+
+
+InternetAddrT = Tuple[str, int]
+UnixAddrT = str
+SocketAddrT = Union[InternetAddrT, UnixAddrT]
+
+
+class QMPError(Exception):
+ """
+ QMP base exception
+ """
+
+
+class QMPConnectError(QMPError):
+ """
+ QMP connection exception
+ """
+
+
+class QMPCapabilitiesError(QMPError):
+ """
+ QMP negotiate capabilities exception
+ """
+
+
+class QMPTimeoutError(QMPError):
+ """
+ QMP timeout exception
+ """
+
+
+class QMPProtocolError(QMPError):
+ """
+ QMP protocol error; unexpected response
+ """
+
+
+class QMPResponseError(QMPError):
+ """
+ Represents erroneous QMP monitor reply
+ """
+ def __init__(self, reply: QMPMessage):
+ try:
+ desc = reply['error']['desc']
+ except KeyError:
+ desc = reply
+ super().__init__(desc)
+ self.reply = reply
+
+
+class QMPBadPortError(QMPError):
+ """
+ Unable to parse socket address: Port was non-numerical.
+ """
+
+
+class QEMUMonitorProtocol:
+ """
+ Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP) and then
+ allow to handle commands and events.
+ """
+
+ #: Logger object for debugging messages
+ logger = logging.getLogger('QMP')
+
+ def __init__(self, address: SocketAddrT,
+ server: bool = False,
+ nickname: Optional[str] = None):
+ """
+ Create a QEMUMonitorProtocol class.
+
+ @param address: QEMU address, can be either a unix socket path (string)
+ or a tuple in the form ( address, port ) for a TCP
+ connection
+ @param server: server mode listens on the socket (bool)
+ @raise OSError on socket connection errors
+ @note No connection is established, this is done by the connect() or
+ accept() methods
+ """
+ self.__events: List[QMPMessage] = []
+ self.__address = address
+ self.__sock = self.__get_sock()
+ self.__sockfile: Optional[TextIO] = None
+ self._nickname = nickname
+ if self._nickname:
+ self.logger = logging.getLogger('QMP').getChild(self._nickname)
+ if server:
+ self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.__sock.bind(self.__address)
+ self.__sock.listen(1)
+
+ def __get_sock(self) -> socket.socket:
+ if isinstance(self.__address, tuple):
+ family = socket.AF_INET
+ else:
+ family = socket.AF_UNIX
+ return socket.socket(family, socket.SOCK_STREAM)
+
+ def __negotiate_capabilities(self) -> QMPMessage:
+ greeting = self.__json_read()
+ if greeting is None or "QMP" not in greeting:
+ raise QMPConnectError
+ # Greeting seems ok, negotiate capabilities
+ resp = self.cmd('qmp_capabilities')
+ if resp and "return" in resp:
+ return greeting
+ raise QMPCapabilitiesError
+
+ def __json_read(self, only_event: bool = False) -> Optional[QMPMessage]:
+ assert self.__sockfile is not None
+ while True:
+ data = self.__sockfile.readline()
+ if not data:
+ return None
+ # By definition, any JSON received from QMP is a QMPMessage,
+ # and we are asserting only at static analysis time that it
+ # has a particular shape.
+ resp: QMPMessage = json.loads(data)
+ if 'event' in resp:
+ self.logger.debug("<<< %s", resp)
+ self.__events.append(resp)
+ if not only_event:
+ continue
+ return resp
+
+ def __get_events(self, wait: Union[bool, float] = False) -> None:
+ """
+ Check for new events in the stream and cache them in __events.
+
+ @param wait (bool): block until an event is available.
+ @param wait (float): If wait is a float, treat it as a timeout value.
+
+ @raise QMPTimeoutError: If a timeout float is provided and the timeout
+ period elapses.
+ @raise QMPConnectError: If wait is True but no events could be
+ retrieved or if some other error occurred.
+ """
+
+ # Current timeout and blocking status
+ current_timeout = self.__sock.gettimeout()
+
+ # Check for new events regardless and pull them into the cache:
+ self.__sock.settimeout(0) # i.e. setblocking(False)
+ try:
+ self.__json_read()
+ except OSError as err:
+ # EAGAIN: No data available; not critical
+ if err.errno != errno.EAGAIN:
+ raise
+ finally:
+ self.__sock.settimeout(current_timeout)
+
+ # Wait for new events, if needed.
+ # if wait is 0.0, this means "no wait" and is also implicitly false.
+ if not self.__events and wait:
+ if isinstance(wait, float):
+ self.__sock.settimeout(wait)
+ try:
+ ret = self.__json_read(only_event=True)
+ except socket.timeout as err:
+ raise QMPTimeoutError("Timeout waiting for event") from err
+ except Exception as err:
+ msg = "Error while reading from socket"
+ raise QMPConnectError(msg) from err
+ finally:
+ self.__sock.settimeout(current_timeout)
+
+ if ret is None:
+ raise QMPConnectError("Error while reading from socket")
+
+ T = TypeVar('T')
+
+ def __enter__(self: T) -> T:
+ # Implement context manager enter function.
+ return self
+
+ def __exit__(self,
+ # pylint: disable=duplicate-code
+ # see https://github.com/PyCQA/pylint/issues/3619
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType]) -> None:
+ # Implement context manager exit function.
+ self.close()
+
+ @classmethod
+ def parse_address(cls, address: str) -> SocketAddrT:
+ """
+ Parse a string into a QMP address.
+
+ Figure out if the argument is in the port:host form.
+ If it's not, it's probably a file path.
+ """
+ components = address.split(':')
+ if len(components) == 2:
+ try:
+ port = int(components[1])
+ except ValueError:
+ msg = f"Bad port: '{components[1]}' in '{address}'."
+ raise QMPBadPortError(msg) from None
+ return (components[0], port)
+
+ # Treat as filepath.
+ return address
+
+ def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
+ """
+ Connect to the QMP Monitor and perform capabilities negotiation.
+
+ @return QMP greeting dict, or None if negotiate is false
+ @raise OSError on socket connection errors
+ @raise QMPConnectError if the greeting is not received
+ @raise QMPCapabilitiesError if fails to negotiate capabilities
+ """
+ self.__sock.connect(self.__address)
+ self.__sockfile = self.__sock.makefile(mode='r')
+ if negotiate:
+ return self.__negotiate_capabilities()
+ return None
+
+ def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
+ """
+ Await connection from QMP Monitor and perform capabilities negotiation.
+
+ @param timeout: timeout in seconds (nonnegative float number, or
+ None). The value passed will set the behavior of the
+ underneath QMP socket as described in [1].
+ Default value is set to 15.0.
+
+ @return QMP greeting dict
+ @raise OSError on socket connection errors
+ @raise QMPConnectError if the greeting is not received
+ @raise QMPCapabilitiesError if fails to negotiate capabilities
+
+ [1]
+ https://docs.python.org/3/library/socket.html#socket.socket.settimeout
+ """
+ self.__sock.settimeout(timeout)
+ self.__sock, _ = self.__sock.accept()
+ self.__sockfile = self.__sock.makefile(mode='r')
+ return self.__negotiate_capabilities()
+
+ def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
+ """
+ Send a QMP command to the QMP Monitor.
+
+ @param qmp_cmd: QMP command to be sent as a Python dict
+ @return QMP response as a Python dict
+ """
+ self.logger.debug(">>> %s", qmp_cmd)
+ self.__sock.sendall(json.dumps(qmp_cmd).encode('utf-8'))
+ resp = self.__json_read()
+ if resp is None:
+ raise QMPConnectError("Unexpected empty reply from server")
+ self.logger.debug("<<< %s", resp)
+ return resp
+
+ def cmd(self, name: str,
+ args: Optional[Dict[str, object]] = None,
+ cmd_id: Optional[object] = None) -> QMPMessage:
+ """
+ Build a QMP command and send it to the QMP Monitor.
+
+ @param name: command name (string)
+ @param args: command arguments (dict)
+ @param cmd_id: command id (dict, list, string or int)
+ """
+ qmp_cmd: QMPMessage = {'execute': name}
+ if args:
+ qmp_cmd['arguments'] = args
+ if cmd_id:
+ qmp_cmd['id'] = cmd_id
+ return self.cmd_obj(qmp_cmd)
+
+ def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
+ """
+ Build and send a QMP command to the monitor, report errors if any
+ """
+ ret = self.cmd(cmd, kwds)
+ if 'error' in ret:
+ raise QMPResponseError(ret)
+ if 'return' not in ret:
+ raise QMPProtocolError(
+ "'return' key not found in QMP response '{}'".format(str(ret))
+ )
+ return cast(QMPReturnValue, ret['return'])
+
+ def pull_event(self,
+ wait: Union[bool, float] = False) -> Optional[QMPMessage]:
+ """
+ Pulls a single event.
+
+ @param wait (bool): block until an event is available.
+ @param wait (float): If wait is a float, treat it as a timeout value.
+
+ @raise QMPTimeoutError: If a timeout float is provided and the timeout
+ period elapses.
+ @raise QMPConnectError: If wait is True but no events could be
+ retrieved or if some other error occurred.
+
+ @return The first available QMP event, or None.
+ """
+ self.__get_events(wait)
+
+ if self.__events:
+ return self.__events.pop(0)
+ return None
+
+ def get_events(self, wait: bool = False) -> List[QMPMessage]:
+ """
+ Get a list of available QMP events and clear all pending events.
+
+ @param wait (bool): block until an event is available.
+ @param wait (float): If wait is a float, treat it as a timeout value.
+
+ @raise QMPTimeoutError: If a timeout float is provided and the timeout
+ period elapses.
+ @raise QMPConnectError: If wait is True but no events could be
+ retrieved or if some other error occurred.
+
+ @return The list of available QMP events.
+ """
+ self.__get_events(wait)
+ events = self.__events
+ self.__events = []
+ return events
+
+ def clear_events(self) -> None:
+ """
+ Clear current list of pending events.
+ """
+ self.__events = []
+
+ def close(self) -> None:
+ """
+ Close the socket and socket file.
+ """
+ if self.__sock:
+ self.__sock.close()
+ if self.__sockfile:
+ self.__sockfile.close()
+
+ def settimeout(self, timeout: Optional[float]) -> None:
+ """
+ Set the socket timeout.
+
+ @param timeout (float): timeout in seconds (non-zero), or None.
+ @note This is a wrap around socket.settimeout
+
+ @raise ValueError: if timeout was set to 0.
+ """
+ if timeout == 0:
+ msg = "timeout cannot be 0; this engages non-blocking mode."
+ msg += " Use 'None' instead to disable timeouts."
+ raise ValueError(msg)
+ self.__sock.settimeout(timeout)
+
+ def send_fd_scm(self, fd: int) -> None:
+ """
+ Send a file descriptor to the remote via SCM_RIGHTS.
+ """
+ if self.__sock.family != socket.AF_UNIX:
+ raise RuntimeError("Can't use SCM_RIGHTS on non-AF_UNIX socket.")
+
+ self.__sock.sendmsg(
+ [b' '],
+ [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('@i', fd))]
+ )
diff --git a/python/qemu/qmp/py.typed b/python/qemu/qmp/py.typed
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/python/qemu/qmp/py.typed
diff --git a/python/qemu/qmp/qemu_ga_client.py b/python/qemu/qmp/qemu_ga_client.py
new file mode 100644
index 000000000..67ac0b421
--- /dev/null
+++ b/python/qemu/qmp/qemu_ga_client.py
@@ -0,0 +1,323 @@
+"""
+QEMU Guest Agent Client
+
+Usage:
+
+Start QEMU with:
+
+# qemu [...] -chardev socket,path=/tmp/qga.sock,server,wait=off,id=qga0 \
+ -device virtio-serial \
+ -device virtserialport,chardev=qga0,name=org.qemu.guest_agent.0
+
+Run the script:
+
+$ qemu-ga-client --address=/tmp/qga.sock <command> [args...]
+
+or
+
+$ export QGA_CLIENT_ADDRESS=/tmp/qga.sock
+$ qemu-ga-client <command> [args...]
+
+For example:
+
+$ qemu-ga-client cat /etc/resolv.conf
+# Generated by NetworkManager
+nameserver 10.0.2.3
+$ qemu-ga-client fsfreeze status
+thawed
+$ qemu-ga-client fsfreeze freeze
+2 filesystems frozen
+
+See also: https://wiki.qemu.org/Features/QAPI/GuestAgent
+"""
+
+# Copyright (C) 2012 Ryota Ozaki <ozaki.ryota@gmail.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2. See
+# the COPYING file in the top-level directory.
+
+import argparse
+import base64
+import errno
+import os
+import random
+import sys
+from typing import (
+ Any,
+ Callable,
+ Dict,
+ Optional,
+ Sequence,
+)
+
+from qemu import qmp
+from qemu.qmp import SocketAddrT
+
+
+# This script has not seen many patches or careful attention in quite
+# some time. If you would like to improve it, please review the design
+# carefully and add docstrings at that point in time. Until then:
+
+# pylint: disable=missing-docstring
+
+
+class QemuGuestAgent(qmp.QEMUMonitorProtocol):
+ def __getattr__(self, name: str) -> Callable[..., Any]:
+ def wrapper(**kwds: object) -> object:
+ return self.command('guest-' + name.replace('_', '-'), **kwds)
+ return wrapper
+
+
+class QemuGuestAgentClient:
+ def __init__(self, address: SocketAddrT):
+ self.qga = QemuGuestAgent(address)
+ self.qga.connect(negotiate=False)
+
+ def sync(self, timeout: Optional[float] = 3) -> None:
+ # Avoid being blocked forever
+ if not self.ping(timeout):
+ raise EnvironmentError('Agent seems not alive')
+ uid = random.randint(0, (1 << 32) - 1)
+ while True:
+ ret = self.qga.sync(id=uid)
+ if isinstance(ret, int) and int(ret) == uid:
+ break
+
+ def __file_read_all(self, handle: int) -> bytes:
+ eof = False
+ data = b''
+ while not eof:
+ ret = self.qga.file_read(handle=handle, count=1024)
+ _data = base64.b64decode(ret['buf-b64'])
+ data += _data
+ eof = ret['eof']
+ return data
+
+ def read(self, path: str) -> bytes:
+ handle = self.qga.file_open(path=path)
+ try:
+ data = self.__file_read_all(handle)
+ finally:
+ self.qga.file_close(handle=handle)
+ return data
+
+ def info(self) -> str:
+ info = self.qga.info()
+
+ msgs = []
+ msgs.append('version: ' + info['version'])
+ msgs.append('supported_commands:')
+ enabled = [c['name'] for c in info['supported_commands']
+ if c['enabled']]
+ msgs.append('\tenabled: ' + ', '.join(enabled))
+ disabled = [c['name'] for c in info['supported_commands']
+ if not c['enabled']]
+ msgs.append('\tdisabled: ' + ', '.join(disabled))
+
+ return '\n'.join(msgs)
+
+ @classmethod
+ def __gen_ipv4_netmask(cls, prefixlen: int) -> str:
+ mask = int('1' * prefixlen + '0' * (32 - prefixlen), 2)
+ return '.'.join([str(mask >> 24),
+ str((mask >> 16) & 0xff),
+ str((mask >> 8) & 0xff),
+ str(mask & 0xff)])
+
+ def ifconfig(self) -> str:
+ nifs = self.qga.network_get_interfaces()
+
+ msgs = []
+ for nif in nifs:
+ msgs.append(nif['name'] + ':')
+ if 'ip-addresses' in nif:
+ for ipaddr in nif['ip-addresses']:
+ if ipaddr['ip-address-type'] == 'ipv4':
+ addr = ipaddr['ip-address']
+ mask = self.__gen_ipv4_netmask(int(ipaddr['prefix']))
+ msgs.append(f"\tinet {addr} netmask {mask}")
+ elif ipaddr['ip-address-type'] == 'ipv6':
+ addr = ipaddr['ip-address']
+ prefix = ipaddr['prefix']
+ msgs.append(f"\tinet6 {addr} prefixlen {prefix}")
+ if nif['hardware-address'] != '00:00:00:00:00:00':
+ msgs.append("\tether " + nif['hardware-address'])
+
+ return '\n'.join(msgs)
+
+ def ping(self, timeout: Optional[float]) -> bool:
+ self.qga.settimeout(timeout)
+ try:
+ self.qga.ping()
+ except TimeoutError:
+ return False
+ return True
+
+ def fsfreeze(self, cmd: str) -> object:
+ if cmd not in ['status', 'freeze', 'thaw']:
+ raise Exception('Invalid command: ' + cmd)
+ # Can be int (freeze, thaw) or GuestFsfreezeStatus (status)
+ return getattr(self.qga, 'fsfreeze' + '_' + cmd)()
+
+ def fstrim(self, minimum: int) -> Dict[str, object]:
+ # returns GuestFilesystemTrimResponse
+ ret = getattr(self.qga, 'fstrim')(minimum=minimum)
+ assert isinstance(ret, dict)
+ return ret
+
+ def suspend(self, mode: str) -> None:
+ if mode not in ['disk', 'ram', 'hybrid']:
+ raise Exception('Invalid mode: ' + mode)
+
+ try:
+ getattr(self.qga, 'suspend' + '_' + mode)()
+ # On error exception will raise
+ except TimeoutError:
+ # On success command will timed out
+ return
+
+ def shutdown(self, mode: str = 'powerdown') -> None:
+ if mode not in ['powerdown', 'halt', 'reboot']:
+ raise Exception('Invalid mode: ' + mode)
+
+ try:
+ self.qga.shutdown(mode=mode)
+ except TimeoutError:
+ pass
+
+
+def _cmd_cat(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
+ if len(args) != 1:
+ print('Invalid argument')
+ print('Usage: cat <file>')
+ sys.exit(1)
+ print(client.read(args[0]))
+
+
+def _cmd_fsfreeze(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
+ usage = 'Usage: fsfreeze status|freeze|thaw'
+ if len(args) != 1:
+ print('Invalid argument')
+ print(usage)
+ sys.exit(1)
+ if args[0] not in ['status', 'freeze', 'thaw']:
+ print('Invalid command: ' + args[0])
+ print(usage)
+ sys.exit(1)
+ cmd = args[0]
+ ret = client.fsfreeze(cmd)
+ if cmd == 'status':
+ print(ret)
+ return
+
+ assert isinstance(ret, int)
+ verb = 'frozen' if cmd == 'freeze' else 'thawed'
+ print(f"{ret:d} filesystems {verb}")
+
+
+def _cmd_fstrim(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
+ if len(args) == 0:
+ minimum = 0
+ else:
+ minimum = int(args[0])
+ print(client.fstrim(minimum))
+
+
+def _cmd_ifconfig(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
+ assert not args
+ print(client.ifconfig())
+
+
+def _cmd_info(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
+ assert not args
+ print(client.info())
+
+
+def _cmd_ping(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
+ timeout = 3.0 if len(args) == 0 else float(args[0])
+ alive = client.ping(timeout)
+ if not alive:
+ print("Not responded in %s sec" % args[0])
+ sys.exit(1)
+
+
+def _cmd_suspend(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
+ usage = 'Usage: suspend disk|ram|hybrid'
+ if len(args) != 1:
+ print('Less argument')
+ print(usage)
+ sys.exit(1)
+ if args[0] not in ['disk', 'ram', 'hybrid']:
+ print('Invalid command: ' + args[0])
+ print(usage)
+ sys.exit(1)
+ client.suspend(args[0])
+
+
+def _cmd_shutdown(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
+ assert not args
+ client.shutdown()
+
+
+_cmd_powerdown = _cmd_shutdown
+
+
+def _cmd_halt(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
+ assert not args
+ client.shutdown('halt')
+
+
+def _cmd_reboot(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
+ assert not args
+ client.shutdown('reboot')
+
+
+commands = [m.replace('_cmd_', '') for m in dir() if '_cmd_' in m]
+
+
+def send_command(address: str, cmd: str, args: Sequence[str]) -> None:
+ if not os.path.exists(address):
+ print('%s not found' % address)
+ sys.exit(1)
+
+ if cmd not in commands:
+ print('Invalid command: ' + cmd)
+ print('Available commands: ' + ', '.join(commands))
+ sys.exit(1)
+
+ try:
+ client = QemuGuestAgentClient(address)
+ except OSError as err:
+ print(err)
+ if err.errno == errno.ECONNREFUSED:
+ print('Hint: qemu is not running?')
+ sys.exit(1)
+
+ if cmd == 'fsfreeze' and args[0] == 'freeze':
+ client.sync(60)
+ elif cmd != 'ping':
+ client.sync()
+
+ globals()['_cmd_' + cmd](client, args)
+
+
+def main() -> None:
+ address = os.environ.get('QGA_CLIENT_ADDRESS')
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--address', action='store',
+ default=address,
+ help='Specify a ip:port pair or a unix socket path')
+ parser.add_argument('command', choices=commands)
+ parser.add_argument('args', nargs='*')
+
+ args = parser.parse_args()
+ if args.address is None:
+ parser.error('address is not specified')
+ sys.exit(1)
+
+ send_command(args.address, args.command, args.args)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/python/qemu/qmp/qmp_shell.py b/python/qemu/qmp/qmp_shell.py
new file mode 100644
index 000000000..e7d7eb18f
--- /dev/null
+++ b/python/qemu/qmp/qmp_shell.py
@@ -0,0 +1,534 @@
+#
+# Copyright (C) 2009, 2010 Red Hat Inc.
+#
+# Authors:
+# Luiz Capitulino <lcapitulino@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2. See
+# the COPYING file in the top-level directory.
+#
+
+"""
+Low-level QEMU shell on top of QMP.
+
+usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server
+
+positional arguments:
+ qmp_server < UNIX socket path | TCP address:port >
+
+optional arguments:
+ -h, --help show this help message and exit
+ -H, --hmp Use HMP interface
+ -N, --skip-negotiation
+ Skip negotiate (for qemu-ga)
+ -v, --verbose Verbose (echo commands sent and received)
+ -p, --pretty Pretty-print JSON
+
+
+Start QEMU with:
+
+# qemu [...] -qmp unix:./qmp-sock,server
+
+Run the shell:
+
+$ qmp-shell ./qmp-sock
+
+Commands have the following format:
+
+ < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
+
+For example:
+
+(QEMU) device_add driver=e1000 id=net1
+{'return': {}}
+(QEMU)
+
+key=value pairs also support Python or JSON object literal subset notations,
+without spaces. Dictionaries/objects {} are supported as are arrays [].
+
+ example-command arg-name1={'key':'value','obj'={'prop':"value"}}
+
+Both JSON and Python formatting should work, including both styles of
+string literal quotes. Both paradigms of literal values should work,
+including null/true/false for JSON and None/True/False for Python.
+
+
+Transactions have the following multi-line format:
+
+ transaction(
+ action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]
+ ...
+ action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]
+ )
+
+One line transactions are also supported:
+
+ transaction( action-name1 ... )
+
+For example:
+
+ (QEMU) transaction(
+ TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1
+ TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0
+ TRANS> )
+ {"return": {}}
+ (QEMU)
+
+Use the -v and -p options to activate the verbose and pretty-print options,
+which will echo back the properly formatted JSON-compliant QMP that is being
+sent to QEMU, which is useful for debugging and documentation generation.
+"""
+
+import argparse
+import ast
+import json
+import logging
+import os
+import re
+import readline
+import sys
+from typing import (
+ Iterator,
+ List,
+ NoReturn,
+ Optional,
+ Sequence,
+)
+
+from qemu import qmp
+from qemu.qmp import QMPMessage
+
+
+LOG = logging.getLogger(__name__)
+
+
+class QMPCompleter:
+ """
+ QMPCompleter provides a readline library tab-complete behavior.
+ """
+ # NB: Python 3.9+ will probably allow us to subclass list[str] directly,
+ # but pylint as of today does not know that List[str] is simply 'list'.
+ def __init__(self) -> None:
+ self._matches: List[str] = []
+
+ def append(self, value: str) -> None:
+ """Append a new valid completion to the list of possibilities."""
+ return self._matches.append(value)
+
+ def complete(self, text: str, state: int) -> Optional[str]:
+ """readline.set_completer() callback implementation."""
+ for cmd in self._matches:
+ if cmd.startswith(text):
+ if state == 0:
+ return cmd
+ state -= 1
+ return None
+
+
+class QMPShellError(qmp.QMPError):
+ """
+ QMP Shell Base error class.
+ """
+
+
+class FuzzyJSON(ast.NodeTransformer):
+ """
+ This extension of ast.NodeTransformer filters literal "true/false/null"
+ values in a Python AST and replaces them by proper "True/False/None" values
+ that Python can properly evaluate.
+ """
+
+ @classmethod
+ def visit_Name(cls, # pylint: disable=invalid-name
+ node: ast.Name) -> ast.AST:
+ """
+ Transform Name nodes with certain values into Constant (keyword) nodes.
+ """
+ if node.id == 'true':
+ return ast.Constant(value=True)
+ if node.id == 'false':
+ return ast.Constant(value=False)
+ if node.id == 'null':
+ return ast.Constant(value=None)
+ return node
+
+
+class QMPShell(qmp.QEMUMonitorProtocol):
+ """
+ QMPShell provides a basic readline-based QMP shell.
+
+ :param address: Address of the QMP server.
+ :param pretty: Pretty-print QMP messages.
+ :param verbose: Echo outgoing QMP messages to console.
+ """
+ def __init__(self, address: qmp.SocketAddrT,
+ pretty: bool = False, verbose: bool = False):
+ super().__init__(address)
+ self._greeting: Optional[QMPMessage] = None
+ self._completer = QMPCompleter()
+ self._transmode = False
+ self._actions: List[QMPMessage] = []
+ self._histfile = os.path.join(os.path.expanduser('~'),
+ '.qmp-shell_history')
+ self.pretty = pretty
+ self.verbose = verbose
+
+ def close(self) -> None:
+ # Hook into context manager of parent to save shell history.
+ self._save_history()
+ super().close()
+
+ def _fill_completion(self) -> None:
+ cmds = self.cmd('query-commands')
+ if 'error' in cmds:
+ return
+ for cmd in cmds['return']:
+ self._completer.append(cmd['name'])
+
+ def _completer_setup(self) -> None:
+ self._completer = QMPCompleter()
+ self._fill_completion()
+ readline.set_history_length(1024)
+ readline.set_completer(self._completer.complete)
+ readline.parse_and_bind("tab: complete")
+ # NB: default delimiters conflict with some command names
+ # (eg. query-), clearing everything as it doesn't seem to matter
+ readline.set_completer_delims('')
+ try:
+ readline.read_history_file(self._histfile)
+ except FileNotFoundError:
+ pass
+ except IOError as err:
+ msg = f"Failed to read history '{self._histfile}': {err!s}"
+ LOG.warning(msg)
+
+ def _save_history(self) -> None:
+ try:
+ readline.write_history_file(self._histfile)
+ except IOError as err:
+ msg = f"Failed to save history file '{self._histfile}': {err!s}"
+ LOG.warning(msg)
+
+ @classmethod
+ def _parse_value(cls, val: str) -> object:
+ try:
+ return int(val)
+ except ValueError:
+ pass
+
+ if val.lower() == 'true':
+ return True
+ if val.lower() == 'false':
+ return False
+ if val.startswith(('{', '[')):
+ # Try first as pure JSON:
+ try:
+ return json.loads(val)
+ except ValueError:
+ pass
+ # Try once again as FuzzyJSON:
+ try:
+ tree = ast.parse(val, mode='eval')
+ transformed = FuzzyJSON().visit(tree)
+ return ast.literal_eval(transformed)
+ except (SyntaxError, ValueError):
+ pass
+ return val
+
+ def _cli_expr(self,
+ tokens: Sequence[str],
+ parent: qmp.QMPObject) -> None:
+ for arg in tokens:
+ (key, sep, val) = arg.partition('=')
+ if sep != '=':
+ raise QMPShellError(
+ f"Expected a key=value pair, got '{arg!s}'"
+ )
+
+ value = self._parse_value(val)
+ optpath = key.split('.')
+ curpath = []
+ for path in optpath[:-1]:
+ curpath.append(path)
+ obj = parent.get(path, {})
+ if not isinstance(obj, dict):
+ msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
+ raise QMPShellError(msg.format('.'.join(curpath)))
+ parent[path] = obj
+ parent = obj
+ if optpath[-1] in parent:
+ if isinstance(parent[optpath[-1]], dict):
+ msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
+ raise QMPShellError(msg.format('.'.join(curpath)))
+ raise QMPShellError(f'Cannot set "{key}" multiple times')
+ parent[optpath[-1]] = value
+
+ def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]:
+ """
+ Build a QMP input object from a user provided command-line in the
+ following format:
+
+ < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
+ """
+ argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+'''
+ cmdargs = re.findall(argument_regex, cmdline)
+ qmpcmd: QMPMessage
+
+ # Transactional CLI entry:
+ if cmdargs and cmdargs[0] == 'transaction(':
+ self._transmode = True
+ self._actions = []
+ cmdargs.pop(0)
+
+ # Transactional CLI exit:
+ if cmdargs and cmdargs[0] == ')' and self._transmode:
+ self._transmode = False
+ if len(cmdargs) > 1:
+ msg = 'Unexpected input after close of Transaction sub-shell'
+ raise QMPShellError(msg)
+ qmpcmd = {
+ 'execute': 'transaction',
+ 'arguments': {'actions': self._actions}
+ }
+ return qmpcmd
+
+ # No args, or no args remaining
+ if not cmdargs:
+ return None
+
+ if self._transmode:
+ # Parse and cache this Transactional Action
+ finalize = False
+ action = {'type': cmdargs[0], 'data': {}}
+ if cmdargs[-1] == ')':
+ cmdargs.pop(-1)
+ finalize = True
+ self._cli_expr(cmdargs[1:], action['data'])
+ self._actions.append(action)
+ return self._build_cmd(')') if finalize else None
+
+ # Standard command: parse and return it to be executed.
+ qmpcmd = {'execute': cmdargs[0], 'arguments': {}}
+ self._cli_expr(cmdargs[1:], qmpcmd['arguments'])
+ return qmpcmd
+
+ def _print(self, qmp_message: object) -> None:
+ jsobj = json.dumps(qmp_message,
+ indent=4 if self.pretty else None,
+ sort_keys=self.pretty)
+ print(str(jsobj))
+
+ def _execute_cmd(self, cmdline: str) -> bool:
+ try:
+ qmpcmd = self._build_cmd(cmdline)
+ except QMPShellError as err:
+ print(
+ f"Error while parsing command line: {err!s}\n"
+ "command format: <command-name> "
+ "[arg-name1=arg1] ... [arg-nameN=argN",
+ file=sys.stderr
+ )
+ return True
+ # For transaction mode, we may have just cached the action:
+ if qmpcmd is None:
+ return True
+ if self.verbose:
+ self._print(qmpcmd)
+ resp = self.cmd_obj(qmpcmd)
+ if resp is None:
+ print('Disconnected')
+ return False
+ self._print(resp)
+ return True
+
+ def connect(self, negotiate: bool = True) -> None:
+ self._greeting = super().connect(negotiate)
+ self._completer_setup()
+
+ def show_banner(self,
+ msg: str = 'Welcome to the QMP low-level shell!') -> None:
+ """
+ Print to stdio a greeting, and the QEMU version if available.
+ """
+ print(msg)
+ if not self._greeting:
+ print('Connected')
+ return
+ version = self._greeting['QMP']['version']['qemu']
+ print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version))
+
+ @property
+ def prompt(self) -> str:
+ """
+ Return the current shell prompt, including a trailing space.
+ """
+ if self._transmode:
+ return 'TRANS> '
+ return '(QEMU) '
+
+ def read_exec_command(self) -> bool:
+ """
+ Read and execute a command.
+
+ @return True if execution was ok, return False if disconnected.
+ """
+ try:
+ cmdline = input(self.prompt)
+ except EOFError:
+ print()
+ return False
+
+ if cmdline == '':
+ for event in self.get_events():
+ print(event)
+ return True
+
+ return self._execute_cmd(cmdline)
+
+ def repl(self) -> Iterator[None]:
+ """
+ Return an iterator that implements the REPL.
+ """
+ self.show_banner()
+ while self.read_exec_command():
+ yield
+ self.close()
+
+
+class HMPShell(QMPShell):
+ """
+ HMPShell provides a basic readline-based HMP shell, tunnelled via QMP.
+
+ :param address: Address of the QMP server.
+ :param pretty: Pretty-print QMP messages.
+ :param verbose: Echo outgoing QMP messages to console.
+ """
+ def __init__(self, address: qmp.SocketAddrT,
+ pretty: bool = False, verbose: bool = False):
+ super().__init__(address, pretty, verbose)
+ self._cpu_index = 0
+
+ def _cmd_completion(self) -> None:
+ for cmd in self._cmd_passthrough('help')['return'].split('\r\n'):
+ if cmd and cmd[0] != '[' and cmd[0] != '\t':
+ name = cmd.split()[0] # drop help text
+ if name == 'info':
+ continue
+ if name.find('|') != -1:
+ # Command in the form 'foobar|f' or 'f|foobar', take the
+ # full name
+ opt = name.split('|')
+ if len(opt[0]) == 1:
+ name = opt[1]
+ else:
+ name = opt[0]
+ self._completer.append(name)
+ self._completer.append('help ' + name) # help completion
+
+ def _info_completion(self) -> None:
+ for cmd in self._cmd_passthrough('info')['return'].split('\r\n'):
+ if cmd:
+ self._completer.append('info ' + cmd.split()[1])
+
+ def _other_completion(self) -> None:
+ # special cases
+ self._completer.append('help info')
+
+ def _fill_completion(self) -> None:
+ self._cmd_completion()
+ self._info_completion()
+ self._other_completion()
+
+ def _cmd_passthrough(self, cmdline: str,
+ cpu_index: int = 0) -> QMPMessage:
+ return self.cmd_obj({
+ 'execute': 'human-monitor-command',
+ 'arguments': {
+ 'command-line': cmdline,
+ 'cpu-index': cpu_index
+ }
+ })
+
+ def _execute_cmd(self, cmdline: str) -> bool:
+ if cmdline.split()[0] == "cpu":
+ # trap the cpu command, it requires special setting
+ try:
+ idx = int(cmdline.split()[1])
+ if 'return' not in self._cmd_passthrough('info version', idx):
+ print('bad CPU index')
+ return True
+ self._cpu_index = idx
+ except ValueError:
+ print('cpu command takes an integer argument')
+ return True
+ resp = self._cmd_passthrough(cmdline, self._cpu_index)
+ if resp is None:
+ print('Disconnected')
+ return False
+ assert 'return' in resp or 'error' in resp
+ if 'return' in resp:
+ # Success
+ if len(resp['return']) > 0:
+ print(resp['return'], end=' ')
+ else:
+ # Error
+ print('%s: %s' % (resp['error']['class'], resp['error']['desc']))
+ return True
+
+ def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None:
+ QMPShell.show_banner(self, msg)
+
+
+def die(msg: str) -> NoReturn:
+ """Write an error to stderr, then exit with a return code of 1."""
+ sys.stderr.write('ERROR: %s\n' % msg)
+ sys.exit(1)
+
+
+def main() -> None:
+ """
+ qmp-shell entry point: parse command line arguments and start the REPL.
+ """
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-H', '--hmp', action='store_true',
+ help='Use HMP interface')
+ parser.add_argument('-N', '--skip-negotiation', action='store_true',
+ help='Skip negotiate (for qemu-ga)')
+ parser.add_argument('-v', '--verbose', action='store_true',
+ help='Verbose (echo commands sent and received)')
+ parser.add_argument('-p', '--pretty', action='store_true',
+ help='Pretty-print JSON')
+
+ default_server = os.environ.get('QMP_SOCKET')
+ parser.add_argument('qmp_server', action='store',
+ default=default_server,
+ help='< UNIX socket path | TCP address:port >')
+
+ args = parser.parse_args()
+ if args.qmp_server is None:
+ parser.error("QMP socket or TCP address must be specified")
+
+ shell_class = HMPShell if args.hmp else QMPShell
+
+ try:
+ address = shell_class.parse_address(args.qmp_server)
+ except qmp.QMPBadPortError:
+ parser.error(f"Bad port number: {args.qmp_server}")
+ return # pycharm doesn't know error() is noreturn
+
+ with shell_class(address, args.pretty, args.verbose) as qemu:
+ try:
+ qemu.connect(negotiate=not args.skip_negotiation)
+ except qmp.QMPConnectError:
+ die("Didn't get QMP greeting message")
+ except qmp.QMPCapabilitiesError:
+ die("Couldn't negotiate capabilities")
+ except OSError as err:
+ die(f"Couldn't connect to {args.qmp_server}: {err!s}")
+
+ for _ in qemu.repl():
+ pass
+
+
+if __name__ == '__main__':
+ main()
diff --git a/python/qemu/qmp/qom.py b/python/qemu/qmp/qom.py
new file mode 100644
index 000000000..8ff28a834
--- /dev/null
+++ b/python/qemu/qmp/qom.py
@@ -0,0 +1,272 @@
+"""
+QEMU Object Model testing tools.
+
+usage: qom [-h] {set,get,list,tree,fuse} ...
+
+Query and manipulate QOM data
+
+optional arguments:
+ -h, --help show this help message and exit
+
+QOM commands:
+ {set,get,list,tree,fuse}
+ set Set a QOM property value
+ get Get a QOM property value
+ list List QOM properties at a given path
+ tree Show QOM tree from a given path
+ fuse Mount a QOM tree as a FUSE filesystem
+"""
+##
+# Copyright John Snow 2020, for Red Hat, Inc.
+# Copyright IBM, Corp. 2011
+#
+# Authors:
+# John Snow <jsnow@redhat.com>
+# Anthony Liguori <aliguori@amazon.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or later.
+# See the COPYING file in the top-level directory.
+#
+# Based on ./scripts/qmp/qom-[set|get|tree|list]
+##
+
+import argparse
+
+from . import QMPResponseError
+from .qom_common import QOMCommand
+
+
+try:
+ from .qom_fuse import QOMFuse
+except ModuleNotFoundError as _err:
+ if _err.name != 'fuse':
+ raise
+else:
+ assert issubclass(QOMFuse, QOMCommand)
+
+
+class QOMSet(QOMCommand):
+ """
+ QOM Command - Set a property to a given value.
+
+ usage: qom-set [-h] [--socket SOCKET] <path>.<property> <value>
+
+ Set a QOM property value
+
+ positional arguments:
+ <path>.<property> QOM path and property, separated by a period '.'
+ <value> new QOM property value
+
+ optional arguments:
+ -h, --help show this help message and exit
+ --socket SOCKET, -s SOCKET
+ QMP socket path or address (addr:port). May also be
+ set via QMP_SOCKET environment variable.
+ """
+ name = 'set'
+ help = 'Set a QOM property value'
+
+ @classmethod
+ def configure_parser(cls, parser: argparse.ArgumentParser) -> None:
+ super().configure_parser(parser)
+ cls.add_path_prop_arg(parser)
+ parser.add_argument(
+ 'value',
+ metavar='<value>',
+ action='store',
+ help='new QOM property value'
+ )
+
+ def __init__(self, args: argparse.Namespace):
+ super().__init__(args)
+ self.path, self.prop = args.path_prop.rsplit('.', 1)
+ self.value = args.value
+
+ def run(self) -> int:
+ rsp = self.qmp.command(
+ 'qom-set',
+ path=self.path,
+ property=self.prop,
+ value=self.value
+ )
+ print(rsp)
+ return 0
+
+
+class QOMGet(QOMCommand):
+ """
+ QOM Command - Get a property's current value.
+
+ usage: qom-get [-h] [--socket SOCKET] <path>.<property>
+
+ Get a QOM property value
+
+ positional arguments:
+ <path>.<property> QOM path and property, separated by a period '.'
+
+ optional arguments:
+ -h, --help show this help message and exit
+ --socket SOCKET, -s SOCKET
+ QMP socket path or address (addr:port). May also be
+ set via QMP_SOCKET environment variable.
+ """
+ name = 'get'
+ help = 'Get a QOM property value'
+
+ @classmethod
+ def configure_parser(cls, parser: argparse.ArgumentParser) -> None:
+ super().configure_parser(parser)
+ cls.add_path_prop_arg(parser)
+
+ def __init__(self, args: argparse.Namespace):
+ super().__init__(args)
+ try:
+ tmp = args.path_prop.rsplit('.', 1)
+ except ValueError as err:
+ raise ValueError('Invalid format for <path>.<property>') from err
+ self.path = tmp[0]
+ self.prop = tmp[1]
+
+ def run(self) -> int:
+ rsp = self.qmp.command(
+ 'qom-get',
+ path=self.path,
+ property=self.prop
+ )
+ if isinstance(rsp, dict):
+ for key, value in rsp.items():
+ print(f"{key}: {value}")
+ else:
+ print(rsp)
+ return 0
+
+
+class QOMList(QOMCommand):
+ """
+ QOM Command - List the properties at a given path.
+
+ usage: qom-list [-h] [--socket SOCKET] <path>
+
+ List QOM properties at a given path
+
+ positional arguments:
+ <path> QOM path
+
+ optional arguments:
+ -h, --help show this help message and exit
+ --socket SOCKET, -s SOCKET
+ QMP socket path or address (addr:port). May also be
+ set via QMP_SOCKET environment variable.
+ """
+ name = 'list'
+ help = 'List QOM properties at a given path'
+
+ @classmethod
+ def configure_parser(cls, parser: argparse.ArgumentParser) -> None:
+ super().configure_parser(parser)
+ parser.add_argument(
+ 'path',
+ metavar='<path>',
+ action='store',
+ help='QOM path',
+ )
+
+ def __init__(self, args: argparse.Namespace):
+ super().__init__(args)
+ self.path = args.path
+
+ def run(self) -> int:
+ rsp = self.qom_list(self.path)
+ for item in rsp:
+ if item.child:
+ print(f"{item.name}/")
+ elif item.link:
+ print(f"@{item.name}/")
+ else:
+ print(item.name)
+ return 0
+
+
+class QOMTree(QOMCommand):
+ """
+ QOM Command - Show the full tree below a given path.
+
+ usage: qom-tree [-h] [--socket SOCKET] [<path>]
+
+ Show QOM tree from a given path
+
+ positional arguments:
+ <path> QOM path
+
+ optional arguments:
+ -h, --help show this help message and exit
+ --socket SOCKET, -s SOCKET
+ QMP socket path or address (addr:port). May also be
+ set via QMP_SOCKET environment variable.
+ """
+ name = 'tree'
+ help = 'Show QOM tree from a given path'
+
+ @classmethod
+ def configure_parser(cls, parser: argparse.ArgumentParser) -> None:
+ super().configure_parser(parser)
+ parser.add_argument(
+ 'path',
+ metavar='<path>',
+ action='store',
+ help='QOM path',
+ nargs='?',
+ default='/'
+ )
+
+ def __init__(self, args: argparse.Namespace):
+ super().__init__(args)
+ self.path = args.path
+
+ def _list_node(self, path: str) -> None:
+ print(path)
+ items = self.qom_list(path)
+ for item in items:
+ if item.child:
+ continue
+ try:
+ rsp = self.qmp.command('qom-get', path=path,
+ property=item.name)
+ print(f" {item.name}: {rsp} ({item.type})")
+ except QMPResponseError as err:
+ print(f" {item.name}: <EXCEPTION: {err!s}> ({item.type})")
+ print('')
+ for item in items:
+ if not item.child:
+ continue
+ if path == '/':
+ path = ''
+ self._list_node(f"{path}/{item.name}")
+
+ def run(self) -> int:
+ self._list_node(self.path)
+ return 0
+
+
+def main() -> int:
+ """QOM script main entry point."""
+ parser = argparse.ArgumentParser(
+ description='Query and manipulate QOM data'
+ )
+ subparsers = parser.add_subparsers(
+ title='QOM commands',
+ dest='command'
+ )
+
+ for command in QOMCommand.__subclasses__():
+ command.register(subparsers)
+
+ args = parser.parse_args()
+
+ if args.command is None:
+ parser.error('Command not specified.')
+ return 1
+
+ cmd_class = args.cmd_class
+ assert isinstance(cmd_class, type(QOMCommand))
+ return cmd_class.command_runner(args)
diff --git a/python/qemu/qmp/qom_common.py b/python/qemu/qmp/qom_common.py
new file mode 100644
index 000000000..a59ae1a2a
--- /dev/null
+++ b/python/qemu/qmp/qom_common.py
@@ -0,0 +1,178 @@
+"""
+QOM Command abstractions.
+"""
+##
+# Copyright John Snow 2020, for Red Hat, Inc.
+# Copyright IBM, Corp. 2011
+#
+# Authors:
+# John Snow <jsnow@redhat.com>
+# Anthony Liguori <aliguori@amazon.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or later.
+# See the COPYING file in the top-level directory.
+#
+# Based on ./scripts/qmp/qom-[set|get|tree|list]
+##
+
+import argparse
+import os
+import sys
+from typing import (
+ Any,
+ Dict,
+ List,
+ Optional,
+ Type,
+ TypeVar,
+)
+
+from . import QEMUMonitorProtocol, QMPError
+
+
+# The following is needed only for a type alias.
+Subparsers = argparse._SubParsersAction # pylint: disable=protected-access
+
+
+class ObjectPropertyInfo:
+ """
+ Represents the return type from e.g. qom-list.
+ """
+ def __init__(self, name: str, type_: str,
+ description: Optional[str] = None,
+ default_value: Optional[object] = None):
+ self.name = name
+ self.type = type_
+ self.description = description
+ self.default_value = default_value
+
+ @classmethod
+ def make(cls, value: Dict[str, Any]) -> 'ObjectPropertyInfo':
+ """
+ Build an ObjectPropertyInfo from a Dict with an unknown shape.
+ """
+ assert value.keys() >= {'name', 'type'}
+ assert value.keys() <= {'name', 'type', 'description', 'default-value'}
+ return cls(value['name'], value['type'],
+ value.get('description'),
+ value.get('default-value'))
+
+ @property
+ def child(self) -> bool:
+ """Is this property a child property?"""
+ return self.type.startswith('child<')
+
+ @property
+ def link(self) -> bool:
+ """Is this property a link property?"""
+ return self.type.startswith('link<')
+
+
+CommandT = TypeVar('CommandT', bound='QOMCommand')
+
+
+class QOMCommand:
+ """
+ Represents a QOM sub-command.
+
+ :param args: Parsed arguments, as returned from parser.parse_args.
+ """
+ name: str
+ help: str
+
+ def __init__(self, args: argparse.Namespace):
+ if args.socket is None:
+ raise QMPError("No QMP socket path or address given")
+ self.qmp = QEMUMonitorProtocol(
+ QEMUMonitorProtocol.parse_address(args.socket)
+ )
+ self.qmp.connect()
+
+ @classmethod
+ def register(cls, subparsers: Subparsers) -> None:
+ """
+ Register this command with the argument parser.
+
+ :param subparsers: argparse subparsers object, from "add_subparsers".
+ """
+ subparser = subparsers.add_parser(cls.name, help=cls.help,
+ description=cls.help)
+ cls.configure_parser(subparser)
+
+ @classmethod
+ def configure_parser(cls, parser: argparse.ArgumentParser) -> None:
+ """
+ Configure a parser with this command's arguments.
+
+ :param parser: argparse parser or subparser object.
+ """
+ default_path = os.environ.get('QMP_SOCKET')
+ parser.add_argument(
+ '--socket', '-s',
+ dest='socket',
+ action='store',
+ help='QMP socket path or address (addr:port).'
+ ' May also be set via QMP_SOCKET environment variable.',
+ default=default_path
+ )
+ parser.set_defaults(cmd_class=cls)
+
+ @classmethod
+ def add_path_prop_arg(cls, parser: argparse.ArgumentParser) -> None:
+ """
+ Add the <path>.<proptery> positional argument to this command.
+
+ :param parser: The parser to add the argument to.
+ """
+ parser.add_argument(
+ 'path_prop',
+ metavar='<path>.<property>',
+ action='store',
+ help="QOM path and property, separated by a period '.'"
+ )
+
+ def run(self) -> int:
+ """
+ Run this command.
+
+ :return: 0 on success, 1 otherwise.
+ """
+ raise NotImplementedError
+
+ def qom_list(self, path: str) -> List[ObjectPropertyInfo]:
+ """
+ :return: a strongly typed list from the 'qom-list' command.
+ """
+ rsp = self.qmp.command('qom-list', path=path)
+ # qom-list returns List[ObjectPropertyInfo]
+ assert isinstance(rsp, list)
+ return [ObjectPropertyInfo.make(x) for x in rsp]
+
+ @classmethod
+ def command_runner(
+ cls: Type[CommandT],
+ args: argparse.Namespace
+ ) -> int:
+ """
+ Run a fully-parsed subcommand, with error-handling for the CLI.
+
+ :return: The return code from `run()`.
+ """
+ try:
+ cmd = cls(args)
+ return cmd.run()
+ except QMPError as err:
+ print(f"{type(err).__name__}: {err!s}", file=sys.stderr)
+ return -1
+
+ @classmethod
+ def entry_point(cls) -> int:
+ """
+ Build this command's parser, parse arguments, and run the command.
+
+ :return: `run`'s return code.
+ """
+ parser = argparse.ArgumentParser(description=cls.help)
+ cls.configure_parser(parser)
+ args = parser.parse_args()
+ return cls.command_runner(args)
diff --git a/python/qemu/qmp/qom_fuse.py b/python/qemu/qmp/qom_fuse.py
new file mode 100644
index 000000000..43f4671fd
--- /dev/null
+++ b/python/qemu/qmp/qom_fuse.py
@@ -0,0 +1,206 @@
+"""
+QEMU Object Model FUSE filesystem tool
+
+This script offers a simple FUSE filesystem within which the QOM tree
+may be browsed, queried and edited using traditional shell tooling.
+
+This script requires the 'fusepy' python package.
+
+
+usage: qom-fuse [-h] [--socket SOCKET] <mount>
+
+Mount a QOM tree as a FUSE filesystem
+
+positional arguments:
+ <mount> Mount point
+
+optional arguments:
+ -h, --help show this help message and exit
+ --socket SOCKET, -s SOCKET
+ QMP socket path or address (addr:port). May also be
+ set via QMP_SOCKET environment variable.
+"""
+##
+# Copyright IBM, Corp. 2012
+# Copyright (C) 2020 Red Hat, Inc.
+#
+# Authors:
+# Anthony Liguori <aliguori@us.ibm.com>
+# Markus Armbruster <armbru@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or later.
+# See the COPYING file in the top-level directory.
+##
+
+import argparse
+from errno import ENOENT, EPERM
+import stat
+import sys
+from typing import (
+ IO,
+ Dict,
+ Iterator,
+ Mapping,
+ Optional,
+ Union,
+)
+
+import fuse
+from fuse import FUSE, FuseOSError, Operations
+
+from . import QMPResponseError
+from .qom_common import QOMCommand
+
+
+fuse.fuse_python_api = (0, 2)
+
+
+class QOMFuse(QOMCommand, Operations):
+ """
+ QOMFuse implements both fuse.Operations and QOMCommand.
+
+ Operations implements the FS, and QOMCommand implements the CLI command.
+ """
+ name = 'fuse'
+ help = 'Mount a QOM tree as a FUSE filesystem'
+ fuse: FUSE
+
+ @classmethod
+ def configure_parser(cls, parser: argparse.ArgumentParser) -> None:
+ super().configure_parser(parser)
+ parser.add_argument(
+ 'mount',
+ metavar='<mount>',
+ action='store',
+ help="Mount point",
+ )
+
+ def __init__(self, args: argparse.Namespace):
+ super().__init__(args)
+ self.mount = args.mount
+ self.ino_map: Dict[str, int] = {}
+ self.ino_count = 1
+
+ def run(self) -> int:
+ print(f"Mounting QOMFS to '{self.mount}'", file=sys.stderr)
+ self.fuse = FUSE(self, self.mount, foreground=True)
+ return 0
+
+ def get_ino(self, path: str) -> int:
+ """Get an inode number for a given QOM path."""
+ if path in self.ino_map:
+ return self.ino_map[path]
+ self.ino_map[path] = self.ino_count
+ self.ino_count += 1
+ return self.ino_map[path]
+
+ def is_object(self, path: str) -> bool:
+ """Is the given QOM path an object?"""
+ try:
+ self.qom_list(path)
+ return True
+ except QMPResponseError:
+ return False
+
+ def is_property(self, path: str) -> bool:
+ """Is the given QOM path a property?"""
+ path, prop = path.rsplit('/', 1)
+ if path == '':
+ path = '/'
+ try:
+ for item in self.qom_list(path):
+ if item.name == prop:
+ return True
+ return False
+ except QMPResponseError:
+ return False
+
+ def is_link(self, path: str) -> bool:
+ """Is the given QOM path a link?"""
+ path, prop = path.rsplit('/', 1)
+ if path == '':
+ path = '/'
+ try:
+ for item in self.qom_list(path):
+ if item.name == prop and item.link:
+ return True
+ return False
+ except QMPResponseError:
+ return False
+
+ def read(self, path: str, size: int, offset: int, fh: IO[bytes]) -> bytes:
+ if not self.is_property(path):
+ raise FuseOSError(ENOENT)
+
+ path, prop = path.rsplit('/', 1)
+ if path == '':
+ path = '/'
+ try:
+ data = str(self.qmp.command('qom-get', path=path, property=prop))
+ data += '\n' # make values shell friendly
+ except QMPResponseError as err:
+ raise FuseOSError(EPERM) from err
+
+ if offset > len(data):
+ return b''
+
+ return bytes(data[offset:][:size], encoding='utf-8')
+
+ def readlink(self, path: str) -> Union[bool, str]:
+ if not self.is_link(path):
+ return False
+ path, prop = path.rsplit('/', 1)
+ prefix = '/'.join(['..'] * (len(path.split('/')) - 1))
+ return prefix + str(self.qmp.command('qom-get', path=path,
+ property=prop))
+
+ def getattr(self, path: str,
+ fh: Optional[IO[bytes]] = None) -> Mapping[str, object]:
+ if self.is_link(path):
+ value = {
+ 'st_mode': 0o755 | stat.S_IFLNK,
+ 'st_ino': self.get_ino(path),
+ 'st_dev': 0,
+ 'st_nlink': 2,
+ 'st_uid': 1000,
+ 'st_gid': 1000,
+ 'st_size': 4096,
+ 'st_atime': 0,
+ 'st_mtime': 0,
+ 'st_ctime': 0
+ }
+ elif self.is_object(path):
+ value = {
+ 'st_mode': 0o755 | stat.S_IFDIR,
+ 'st_ino': self.get_ino(path),
+ 'st_dev': 0,
+ 'st_nlink': 2,
+ 'st_uid': 1000,
+ 'st_gid': 1000,
+ 'st_size': 4096,
+ 'st_atime': 0,
+ 'st_mtime': 0,
+ 'st_ctime': 0
+ }
+ elif self.is_property(path):
+ value = {
+ 'st_mode': 0o644 | stat.S_IFREG,
+ 'st_ino': self.get_ino(path),
+ 'st_dev': 0,
+ 'st_nlink': 1,
+ 'st_uid': 1000,
+ 'st_gid': 1000,
+ 'st_size': 4096,
+ 'st_atime': 0,
+ 'st_mtime': 0,
+ 'st_ctime': 0
+ }
+ else:
+ raise FuseOSError(ENOENT)
+ return value
+
+ def readdir(self, path: str, fh: IO[bytes]) -> Iterator[str]:
+ yield '.'
+ yield '..'
+ for item in self.qom_list(path):
+ yield item.name
diff --git a/python/qemu/utils/README.rst b/python/qemu/utils/README.rst
new file mode 100644
index 000000000..d5f2da145
--- /dev/null
+++ b/python/qemu/utils/README.rst
@@ -0,0 +1,7 @@
+qemu.utils package
+==================
+
+This package provides miscellaneous utilities used for testing and
+debugging QEMU. It is used primarily by the vm and avocado tests.
+
+See the documentation in ``__init__.py`` for more information.
diff --git a/python/qemu/utils/__init__.py b/python/qemu/utils/__init__.py
new file mode 100644
index 000000000..7f1a5138c
--- /dev/null
+++ b/python/qemu/utils/__init__.py
@@ -0,0 +1,45 @@
+"""
+QEMU development and testing utilities
+
+This package provides a small handful of utilities for performing
+various tasks not directly related to the launching of a VM.
+"""
+
+# Copyright (C) 2021 Red Hat Inc.
+#
+# Authors:
+# John Snow <jsnow@redhat.com>
+# Cleber Rosa <crosa@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2. See
+# the COPYING file in the top-level directory.
+#
+
+import re
+from typing import Optional
+
+# pylint: disable=import-error
+from .accel import kvm_available, list_accel, tcg_available
+
+
+__all__ = (
+ 'get_info_usernet_hostfwd_port',
+ 'kvm_available',
+ 'list_accel',
+ 'tcg_available',
+)
+
+
+def get_info_usernet_hostfwd_port(info_usernet_output: str) -> Optional[int]:
+ """
+ Returns the port given to the hostfwd parameter via info usernet
+
+ :param info_usernet_output: output generated by hmp command "info usernet"
+ :return: the port number allocated by the hostfwd option
+ """
+ for line in info_usernet_output.split('\r\n'):
+ regex = r'TCP.HOST_FORWARD.*127\.0\.0\.1\s+(\d+)\s+10\.'
+ match = re.search(regex, line)
+ if match is not None:
+ return int(match[1])
+ return None
diff --git a/python/qemu/utils/accel.py b/python/qemu/utils/accel.py
new file mode 100644
index 000000000..386ff640c
--- /dev/null
+++ b/python/qemu/utils/accel.py
@@ -0,0 +1,84 @@
+"""
+QEMU accel module:
+
+This module provides utilities for discover and check the availability of
+accelerators.
+"""
+# Copyright (C) 2015-2016 Red Hat Inc.
+# Copyright (C) 2012 IBM Corp.
+#
+# Authors:
+# Fam Zheng <famz@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2. See
+# the COPYING file in the top-level directory.
+#
+
+import logging
+import os
+import subprocess
+from typing import List, Optional
+
+
+LOG = logging.getLogger(__name__)
+
+# Mapping host architecture to any additional architectures it can
+# support which often includes its 32 bit cousin.
+ADDITIONAL_ARCHES = {
+ "x86_64": "i386",
+ "aarch64": "armhf",
+ "ppc64le": "ppc64",
+}
+
+
+def list_accel(qemu_bin: str) -> List[str]:
+ """
+ List accelerators enabled in the QEMU binary.
+
+ @param qemu_bin (str): path to the QEMU binary.
+ @raise Exception: if failed to run ``qemu -accel help``
+ @return a list of accelerator names.
+ """
+ if not qemu_bin:
+ return []
+ try:
+ out = subprocess.check_output([qemu_bin, '-accel', 'help'],
+ universal_newlines=True)
+ except:
+ LOG.debug("Failed to get the list of accelerators in %s", qemu_bin)
+ raise
+ # Skip the first line which is the header.
+ return [acc.strip() for acc in out.splitlines()[1:]]
+
+
+def kvm_available(target_arch: Optional[str] = None,
+ qemu_bin: Optional[str] = None) -> bool:
+ """
+ Check if KVM is available using the following heuristic:
+ - Kernel module is present in the host;
+ - Target and host arches don't mismatch;
+ - KVM is enabled in the QEMU binary.
+
+ @param target_arch (str): target architecture
+ @param qemu_bin (str): path to the QEMU binary
+ @return True if kvm is available, otherwise False.
+ """
+ if not os.access("/dev/kvm", os.R_OK | os.W_OK):
+ return False
+ if target_arch:
+ host_arch = os.uname()[4]
+ if target_arch != host_arch:
+ if target_arch != ADDITIONAL_ARCHES.get(host_arch):
+ return False
+ if qemu_bin and "kvm" not in list_accel(qemu_bin):
+ return False
+ return True
+
+
+def tcg_available(qemu_bin: str) -> bool:
+ """
+ Check if TCG is available.
+
+ @param qemu_bin (str): path to the QEMU binary
+ """
+ return 'tcg' in list_accel(qemu_bin)
diff --git a/python/qemu/utils/py.typed b/python/qemu/utils/py.typed
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/python/qemu/utils/py.typed
diff --git a/python/setup.cfg b/python/setup.cfg
new file mode 100644
index 000000000..417e93783
--- /dev/null
+++ b/python/setup.cfg
@@ -0,0 +1,177 @@
+[metadata]
+name = qemu
+version = file:VERSION
+maintainer = QEMU Developer Team
+maintainer_email = qemu-devel@nongnu.org
+url = https://www.qemu.org/
+download_url = https://www.qemu.org/download/
+description = QEMU Python Build, Debug and SDK tooling.
+long_description = file:PACKAGE.rst
+long_description_content_type = text/x-rst
+classifiers =
+ Development Status :: 3 - Alpha
+ License :: OSI Approved :: GNU General Public License v2 (GPLv2)
+ Natural Language :: English
+ Operating System :: OS Independent
+ Programming Language :: Python :: 3 :: Only
+ Programming Language :: Python :: 3.6
+ Programming Language :: Python :: 3.7
+ Programming Language :: Python :: 3.8
+ Programming Language :: Python :: 3.9
+ Programming Language :: Python :: 3.10
+ Typing :: Typed
+
+[options]
+python_requires = >= 3.6
+packages =
+ qemu.qmp
+ qemu.machine
+ qemu.utils
+ qemu.aqmp
+
+[options.package_data]
+* = py.typed
+
+[options.extras_require]
+# For the devel group, When adding new dependencies or bumping the minimum
+# version, use e.g. "pipenv install --dev pylint==3.0.0".
+# Subsequently, edit 'Pipfile' to remove e.g. 'pylint = "==3.0.0'.
+devel =
+ avocado-framework >= 90.0
+ flake8 >= 3.6.0
+ fusepy >= 2.0.4
+ isort >= 5.1.2
+ mypy >= 0.770
+ pylint >= 2.8.0
+ tox >= 3.18.0
+ urwid >= 2.1.2
+ urwid-readline >= 0.13
+ Pygments >= 2.9.0
+
+# Provides qom-fuse functionality
+fuse =
+ fusepy >= 2.0.4
+
+# AQMP TUI dependencies
+tui =
+ urwid >= 2.1.2
+ urwid-readline >= 0.13
+ Pygments >= 2.9.0
+
+[options.entry_points]
+console_scripts =
+ qom = qemu.qmp.qom:main
+ qom-set = qemu.qmp.qom:QOMSet.entry_point
+ qom-get = qemu.qmp.qom:QOMGet.entry_point
+ qom-list = qemu.qmp.qom:QOMList.entry_point
+ qom-tree = qemu.qmp.qom:QOMTree.entry_point
+ qom-fuse = qemu.qmp.qom_fuse:QOMFuse.entry_point [fuse]
+ qemu-ga-client = qemu.qmp.qemu_ga_client:main
+ qmp-shell = qemu.qmp.qmp_shell:main
+ aqmp-tui = qemu.aqmp.aqmp_tui:main [tui]
+
+[flake8]
+extend-ignore = E722 # Prefer pylint's bare-except checks to flake8's
+exclude = __pycache__,
+
+[mypy]
+strict = True
+python_version = 3.6
+warn_unused_configs = True
+namespace_packages = True
+
+[mypy-qemu.qmp.qom_fuse]
+# fusepy has no type stubs:
+allow_subclassing_any = True
+
+[mypy-qemu.aqmp.aqmp_tui]
+# urwid and urwid_readline have no type stubs:
+allow_subclassing_any = True
+
+# The following missing import directives are because these libraries do not
+# provide type stubs. Allow them on an as-needed basis for mypy.
+[mypy-fuse]
+ignore_missing_imports = True
+
+[mypy-urwid]
+ignore_missing_imports = True
+
+[mypy-urwid_readline]
+ignore_missing_imports = True
+
+[mypy-pygments]
+ignore_missing_imports = True
+
+[pylint.messages control]
+# Disable the message, report, category or checker with the given id(s). You
+# can either give multiple identifiers separated by comma (,) or put this
+# option multiple times (only on the command line, not in the configuration
+# file where it should appear only once). You can also use "--disable=all" to
+# disable everything first and then reenable specific checks. For example, if
+# you want to run only the similarities checker, you can use "--disable=all
+# --enable=similarities". If you want to run only the classes checker, but have
+# no Warning level messages displayed, use "--disable=all --enable=classes
+# --disable=W".
+disable=consider-using-f-string,
+ too-many-function-args, # mypy handles this with less false positives.
+ no-member, # mypy also handles this better.
+
+[pylint.basic]
+# Good variable names which should always be accepted, separated by a comma.
+good-names=i,
+ j,
+ k,
+ ex,
+ Run,
+ _, # By convention: Unused variable
+ fh, # fh = open(...)
+ fd, # fd = os.open(...)
+ c, # for c in string: ...
+ T, # for TypeVars. See pylint#3401
+
+[pylint.similarities]
+# Ignore imports when computing similarities.
+ignore-imports=yes
+ignore-signatures=yes
+
+# Minimum lines number of a similarity.
+# TODO: Remove after we opt in to Pylint 2.8.3. See commit msg.
+min-similarity-lines=6
+
+
+[isort]
+force_grid_wrap=4
+force_sort_within_sections=True
+include_trailing_comma=True
+line_length=72
+lines_after_imports=2
+multi_line_output=3
+
+# tox (https://tox.readthedocs.io/) is a tool for running tests in
+# multiple virtualenvs. This configuration file will run the test suite
+# on all supported python versions. To use it, "pip install tox" and
+# then run "tox" from this directory. You will need all of these versions
+# of python available on your system to run this test.
+
+[tox:tox]
+envlist = py36, py37, py38, py39, py310
+skip_missing_interpreters = true
+
+[testenv]
+allowlist_externals = make
+deps =
+ .[devel]
+ .[fuse] # Workaround to trigger tox venv rebuild
+ .[tui] # Workaround to trigger tox venv rebuild
+commands =
+ make check
+
+# Coverage.py [https://coverage.readthedocs.io/en/latest/] is a tool for
+# measuring code coverage of Python programs. It monitors your program,
+# noting which parts of the code have been executed, then analyzes the
+# source to identify code that could have been executed but was not.
+
+[coverage:run]
+concurrency = multiprocessing
+source = qemu/
+parallel = true
diff --git a/python/setup.py b/python/setup.py
new file mode 100755
index 000000000..2014f81b7
--- /dev/null
+++ b/python/setup.py
@@ -0,0 +1,23 @@
+#!/usr/bin/env python3
+"""
+QEMU tooling installer script
+Copyright (c) 2020-2021 John Snow for Red Hat, Inc.
+"""
+
+import setuptools
+import pkg_resources
+
+
+def main():
+ """
+ QEMU tooling installer
+ """
+
+ # https://medium.com/@daveshawley/safely-using-setup-cfg-for-metadata-1babbe54c108
+ pkg_resources.require('setuptools>=39.2')
+
+ setuptools.setup()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/python/tests/flake8.sh b/python/tests/flake8.sh
new file mode 100755
index 000000000..1cd7d40fa
--- /dev/null
+++ b/python/tests/flake8.sh
@@ -0,0 +1,2 @@
+#!/bin/sh -e
+python3 -m flake8 qemu/
diff --git a/python/tests/iotests-mypy.sh b/python/tests/iotests-mypy.sh
new file mode 100755
index 000000000..ee7647081
--- /dev/null
+++ b/python/tests/iotests-mypy.sh
@@ -0,0 +1,4 @@
+#!/bin/sh -e
+
+cd ../tests/qemu-iotests/
+python3 -m linters --mypy
diff --git a/python/tests/iotests-pylint.sh b/python/tests/iotests-pylint.sh
new file mode 100755
index 000000000..4cae03424
--- /dev/null
+++ b/python/tests/iotests-pylint.sh
@@ -0,0 +1,4 @@
+#!/bin/sh -e
+
+cd ../tests/qemu-iotests/
+python3 -m linters --pylint
diff --git a/python/tests/isort.sh b/python/tests/isort.sh
new file mode 100755
index 000000000..4480405bf
--- /dev/null
+++ b/python/tests/isort.sh
@@ -0,0 +1,2 @@
+#!/bin/sh -e
+python3 -m isort -c qemu/
diff --git a/python/tests/mypy.sh b/python/tests/mypy.sh
new file mode 100755
index 000000000..5f980f563
--- /dev/null
+++ b/python/tests/mypy.sh
@@ -0,0 +1,2 @@
+#!/bin/sh -e
+python3 -m mypy -p qemu
diff --git a/python/tests/protocol.py b/python/tests/protocol.py
new file mode 100644
index 000000000..5cd7938be
--- /dev/null
+++ b/python/tests/protocol.py
@@ -0,0 +1,583 @@
+import asyncio
+from contextlib import contextmanager
+import os
+import socket
+from tempfile import TemporaryDirectory
+
+import avocado
+
+from qemu.aqmp import ConnectError, Runstate
+from qemu.aqmp.protocol import AsyncProtocol, StateError
+from qemu.aqmp.util import asyncio_run, create_task
+
+
+class NullProtocol(AsyncProtocol[None]):
+ """
+ NullProtocol is a test mockup of an AsyncProtocol implementation.
+
+ It adds a fake_session instance variable that enables a code path
+ that bypasses the actual connection logic, but still allows the
+ reader/writers to start.
+
+ Because the message type is defined as None, an asyncio.Event named
+ 'trigger_input' is created that prohibits the reader from
+ incessantly being able to yield None; this event can be poked to
+ simulate an incoming message.
+
+ For testing symmetry with do_recv, an interface is added to "send" a
+ Null message.
+
+ For testing purposes, a "simulate_disconnection" method is also
+ added which allows us to trigger a bottom half disconnect without
+ injecting any real errors into the reader/writer loops; in essence
+ it performs exactly half of what disconnect() normally does.
+ """
+ def __init__(self, name=None):
+ self.fake_session = False
+ self.trigger_input: asyncio.Event
+ super().__init__(name)
+
+ async def _establish_session(self):
+ self.trigger_input = asyncio.Event()
+ await super()._establish_session()
+
+ async def _do_accept(self, address, ssl=None):
+ if not self.fake_session:
+ await super()._do_accept(address, ssl)
+
+ async def _do_connect(self, address, ssl=None):
+ if not self.fake_session:
+ await super()._do_connect(address, ssl)
+
+ async def _do_recv(self) -> None:
+ await self.trigger_input.wait()
+ self.trigger_input.clear()
+
+ def _do_send(self, msg: None) -> None:
+ pass
+
+ async def send_msg(self) -> None:
+ await self._outgoing.put(None)
+
+ async def simulate_disconnect(self) -> None:
+ """
+ Simulates a bottom-half disconnect.
+
+ This method schedules a disconnection but does not wait for it
+ to complete. This is used to put the loop into the DISCONNECTING
+ state without fully quiescing it back to IDLE. This is normally
+ something you cannot coax AsyncProtocol to do on purpose, but it
+ will be similar to what happens with an unhandled Exception in
+ the reader/writer.
+
+ Under normal circumstances, the library design requires you to
+ await on disconnect(), which awaits the disconnect task and
+ returns bottom half errors as a pre-condition to allowing the
+ loop to return back to IDLE.
+ """
+ self._schedule_disconnect()
+
+
+class LineProtocol(AsyncProtocol[str]):
+ def __init__(self, name=None):
+ super().__init__(name)
+ self.rx_history = []
+
+ async def _do_recv(self) -> str:
+ raw = await self._readline()
+ msg = raw.decode()
+ self.rx_history.append(msg)
+ return msg
+
+ def _do_send(self, msg: str) -> None:
+ assert self._writer is not None
+ self._writer.write(msg.encode() + b'\n')
+
+ async def send_msg(self, msg: str) -> None:
+ await self._outgoing.put(msg)
+
+
+def run_as_task(coro, allow_cancellation=False):
+ """
+ Run a given coroutine as a task.
+
+ Optionally, wrap it in a try..except block that allows this
+ coroutine to be canceled gracefully.
+ """
+ async def _runner():
+ try:
+ await coro
+ except asyncio.CancelledError:
+ if allow_cancellation:
+ return
+ raise
+ return create_task(_runner())
+
+
+@contextmanager
+def jammed_socket():
+ """
+ Opens up a random unused TCP port on localhost, then jams it.
+ """
+ socks = []
+
+ try:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind(('127.0.0.1', 0))
+ sock.listen(1)
+ address = sock.getsockname()
+
+ socks.append(sock)
+
+ # I don't *fully* understand why, but it takes *two* un-accepted
+ # connections to start jamming the socket.
+ for _ in range(2):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect(address)
+ socks.append(sock)
+
+ yield address
+
+ finally:
+ for sock in socks:
+ sock.close()
+
+
+class Smoke(avocado.Test):
+
+ def setUp(self):
+ self.proto = NullProtocol()
+
+ def test__repr__(self):
+ self.assertEqual(
+ repr(self.proto),
+ "<NullProtocol runstate=IDLE>"
+ )
+
+ def testRunstate(self):
+ self.assertEqual(
+ self.proto.runstate,
+ Runstate.IDLE
+ )
+
+ def testDefaultName(self):
+ self.assertEqual(
+ self.proto.name,
+ None
+ )
+
+ def testLogger(self):
+ self.assertEqual(
+ self.proto.logger.name,
+ 'qemu.aqmp.protocol'
+ )
+
+ def testName(self):
+ self.proto = NullProtocol('Steve')
+
+ self.assertEqual(
+ self.proto.name,
+ 'Steve'
+ )
+
+ self.assertEqual(
+ self.proto.logger.name,
+ 'qemu.aqmp.protocol.Steve'
+ )
+
+ self.assertEqual(
+ repr(self.proto),
+ "<NullProtocol name='Steve' runstate=IDLE>"
+ )
+
+
+class TestBase(avocado.Test):
+
+ def setUp(self):
+ self.proto = NullProtocol(type(self).__name__)
+ self.assertEqual(self.proto.runstate, Runstate.IDLE)
+ self.runstate_watcher = None
+
+ def tearDown(self):
+ self.assertEqual(self.proto.runstate, Runstate.IDLE)
+
+ async def _asyncSetUp(self):
+ pass
+
+ async def _asyncTearDown(self):
+ if self.runstate_watcher:
+ await self.runstate_watcher
+
+ @staticmethod
+ def async_test(async_test_method):
+ """
+ Decorator; adds SetUp and TearDown to async tests.
+ """
+ async def _wrapper(self, *args, **kwargs):
+ loop = asyncio.get_event_loop()
+ loop.set_debug(True)
+
+ await self._asyncSetUp()
+ await async_test_method(self, *args, **kwargs)
+ await self._asyncTearDown()
+
+ return _wrapper
+
+ # Definitions
+
+ # The states we expect a "bad" connect/accept attempt to transition through
+ BAD_CONNECTION_STATES = (
+ Runstate.CONNECTING,
+ Runstate.DISCONNECTING,
+ Runstate.IDLE,
+ )
+
+ # The states we expect a "good" session to transition through
+ GOOD_CONNECTION_STATES = (
+ Runstate.CONNECTING,
+ Runstate.RUNNING,
+ Runstate.DISCONNECTING,
+ Runstate.IDLE,
+ )
+
+ # Helpers
+
+ async def _watch_runstates(self, *states):
+ """
+ This launches a task alongside (most) tests below to confirm that
+ the sequence of runstate changes that occur is exactly as
+ anticipated.
+ """
+ async def _watcher():
+ for state in states:
+ new_state = await self.proto.runstate_changed()
+ self.assertEqual(
+ new_state,
+ state,
+ msg=f"Expected state '{state.name}'",
+ )
+
+ self.runstate_watcher = create_task(_watcher())
+ # Kick the loop and force the task to block on the event.
+ await asyncio.sleep(0)
+
+
+class State(TestBase):
+
+ @TestBase.async_test
+ async def testSuperfluousDisconnect(self):
+ """
+ Test calling disconnect() while already disconnected.
+ """
+ await self._watch_runstates(
+ Runstate.DISCONNECTING,
+ Runstate.IDLE,
+ )
+ await self.proto.disconnect()
+
+
+class Connect(TestBase):
+ """
+ Tests primarily related to calling Connect().
+ """
+ async def _bad_connection(self, family: str):
+ assert family in ('INET', 'UNIX')
+
+ if family == 'INET':
+ await self.proto.connect(('127.0.0.1', 0))
+ elif family == 'UNIX':
+ await self.proto.connect('/dev/null')
+
+ async def _hanging_connection(self):
+ with jammed_socket() as addr:
+ await self.proto.connect(addr)
+
+ async def _bad_connection_test(self, family: str):
+ await self._watch_runstates(*self.BAD_CONNECTION_STATES)
+
+ with self.assertRaises(ConnectError) as context:
+ await self._bad_connection(family)
+
+ self.assertIsInstance(context.exception.exc, OSError)
+ self.assertEqual(
+ context.exception.error_message,
+ "Failed to establish connection"
+ )
+
+ @TestBase.async_test
+ async def testBadINET(self):
+ """
+ Test an immediately rejected call to an IP target.
+ """
+ await self._bad_connection_test('INET')
+
+ @TestBase.async_test
+ async def testBadUNIX(self):
+ """
+ Test an immediately rejected call to a UNIX socket target.
+ """
+ await self._bad_connection_test('UNIX')
+
+ @TestBase.async_test
+ async def testCancellation(self):
+ """
+ Test what happens when a connection attempt is aborted.
+ """
+ # Note that accept() cannot be cancelled outright, as it isn't a task.
+ # However, we can wrap it in a task and cancel *that*.
+ await self._watch_runstates(*self.BAD_CONNECTION_STATES)
+ task = run_as_task(self._hanging_connection(), allow_cancellation=True)
+
+ state = await self.proto.runstate_changed()
+ self.assertEqual(state, Runstate.CONNECTING)
+
+ # This is insider baseball, but the connection attempt has
+ # yielded *just* before the actual connection attempt, so kick
+ # the loop to make sure it's truly wedged.
+ await asyncio.sleep(0)
+
+ task.cancel()
+ await task
+
+ @TestBase.async_test
+ async def testTimeout(self):
+ """
+ Test what happens when a connection attempt times out.
+ """
+ await self._watch_runstates(*self.BAD_CONNECTION_STATES)
+ task = run_as_task(self._hanging_connection())
+
+ # More insider baseball: to improve the speed of this test while
+ # guaranteeing that the connection even gets a chance to start,
+ # verify that the connection hangs *first*, then await the
+ # result of the task with a nearly-zero timeout.
+
+ state = await self.proto.runstate_changed()
+ self.assertEqual(state, Runstate.CONNECTING)
+ await asyncio.sleep(0)
+
+ with self.assertRaises(asyncio.TimeoutError):
+ await asyncio.wait_for(task, timeout=0)
+
+ @TestBase.async_test
+ async def testRequire(self):
+ """
+ Test what happens when a connection attempt is made while CONNECTING.
+ """
+ await self._watch_runstates(*self.BAD_CONNECTION_STATES)
+ task = run_as_task(self._hanging_connection(), allow_cancellation=True)
+
+ state = await self.proto.runstate_changed()
+ self.assertEqual(state, Runstate.CONNECTING)
+
+ with self.assertRaises(StateError) as context:
+ await self._bad_connection('UNIX')
+
+ self.assertEqual(
+ context.exception.error_message,
+ "NullProtocol is currently connecting."
+ )
+ self.assertEqual(context.exception.state, Runstate.CONNECTING)
+ self.assertEqual(context.exception.required, Runstate.IDLE)
+
+ task.cancel()
+ await task
+
+ @TestBase.async_test
+ async def testImplicitRunstateInit(self):
+ """
+ Test what happens if we do not wait on the runstate event until
+ AFTER a connection is made, i.e., connect()/accept() themselves
+ initialize the runstate event. All of the above tests force the
+ initialization by waiting on the runstate *first*.
+ """
+ task = run_as_task(self._hanging_connection(), allow_cancellation=True)
+
+ # Kick the loop to coerce the state change
+ await asyncio.sleep(0)
+ assert self.proto.runstate == Runstate.CONNECTING
+
+ # We already missed the transition to CONNECTING
+ await self._watch_runstates(Runstate.DISCONNECTING, Runstate.IDLE)
+
+ task.cancel()
+ await task
+
+
+class Accept(Connect):
+ """
+ All of the same tests as Connect, but using the accept() interface.
+ """
+ async def _bad_connection(self, family: str):
+ assert family in ('INET', 'UNIX')
+
+ if family == 'INET':
+ await self.proto.accept(('example.com', 1))
+ elif family == 'UNIX':
+ await self.proto.accept('/dev/null')
+
+ async def _hanging_connection(self):
+ with TemporaryDirectory(suffix='.aqmp') as tmpdir:
+ sock = os.path.join(tmpdir, type(self.proto).__name__ + ".sock")
+ await self.proto.accept(sock)
+
+
+class FakeSession(TestBase):
+
+ def setUp(self):
+ super().setUp()
+ self.proto.fake_session = True
+
+ async def _asyncSetUp(self):
+ await super()._asyncSetUp()
+ await self._watch_runstates(*self.GOOD_CONNECTION_STATES)
+
+ async def _asyncTearDown(self):
+ await self.proto.disconnect()
+ await super()._asyncTearDown()
+
+ ####
+
+ @TestBase.async_test
+ async def testFakeConnect(self):
+
+ """Test the full state lifecycle (via connect) with a no-op session."""
+ await self.proto.connect('/not/a/real/path')
+ self.assertEqual(self.proto.runstate, Runstate.RUNNING)
+
+ @TestBase.async_test
+ async def testFakeAccept(self):
+ """Test the full state lifecycle (via accept) with a no-op session."""
+ await self.proto.accept('/not/a/real/path')
+ self.assertEqual(self.proto.runstate, Runstate.RUNNING)
+
+ @TestBase.async_test
+ async def testFakeRecv(self):
+ """Test receiving a fake/null message."""
+ await self.proto.accept('/not/a/real/path')
+
+ logname = self.proto.logger.name
+ with self.assertLogs(logname, level='DEBUG') as context:
+ self.proto.trigger_input.set()
+ self.proto.trigger_input.clear()
+ await asyncio.sleep(0) # Kick reader.
+
+ self.assertEqual(
+ context.output,
+ [f"DEBUG:{logname}:<-- None"],
+ )
+
+ @TestBase.async_test
+ async def testFakeSend(self):
+ """Test sending a fake/null message."""
+ await self.proto.accept('/not/a/real/path')
+
+ logname = self.proto.logger.name
+ with self.assertLogs(logname, level='DEBUG') as context:
+ # Cheat: Send a Null message to nobody.
+ await self.proto.send_msg()
+ # Kick writer; awaiting on a queue.put isn't sufficient to yield.
+ await asyncio.sleep(0)
+
+ self.assertEqual(
+ context.output,
+ [f"DEBUG:{logname}:--> None"],
+ )
+
+ async def _prod_session_api(
+ self,
+ current_state: Runstate,
+ error_message: str,
+ accept: bool = True
+ ):
+ with self.assertRaises(StateError) as context:
+ if accept:
+ await self.proto.accept('/not/a/real/path')
+ else:
+ await self.proto.connect('/not/a/real/path')
+
+ self.assertEqual(context.exception.error_message, error_message)
+ self.assertEqual(context.exception.state, current_state)
+ self.assertEqual(context.exception.required, Runstate.IDLE)
+
+ @TestBase.async_test
+ async def testAcceptRequireRunning(self):
+ """Test that accept() cannot be called when Runstate=RUNNING"""
+ await self.proto.accept('/not/a/real/path')
+
+ await self._prod_session_api(
+ Runstate.RUNNING,
+ "NullProtocol is already connected and running.",
+ accept=True,
+ )
+
+ @TestBase.async_test
+ async def testConnectRequireRunning(self):
+ """Test that connect() cannot be called when Runstate=RUNNING"""
+ await self.proto.accept('/not/a/real/path')
+
+ await self._prod_session_api(
+ Runstate.RUNNING,
+ "NullProtocol is already connected and running.",
+ accept=False,
+ )
+
+ @TestBase.async_test
+ async def testAcceptRequireDisconnecting(self):
+ """Test that accept() cannot be called when Runstate=DISCONNECTING"""
+ await self.proto.accept('/not/a/real/path')
+
+ # Cheat: force a disconnect.
+ await self.proto.simulate_disconnect()
+
+ await self._prod_session_api(
+ Runstate.DISCONNECTING,
+ ("NullProtocol is disconnecting."
+ " Call disconnect() to return to IDLE state."),
+ accept=True,
+ )
+
+ @TestBase.async_test
+ async def testConnectRequireDisconnecting(self):
+ """Test that connect() cannot be called when Runstate=DISCONNECTING"""
+ await self.proto.accept('/not/a/real/path')
+
+ # Cheat: force a disconnect.
+ await self.proto.simulate_disconnect()
+
+ await self._prod_session_api(
+ Runstate.DISCONNECTING,
+ ("NullProtocol is disconnecting."
+ " Call disconnect() to return to IDLE state."),
+ accept=False,
+ )
+
+
+class SimpleSession(TestBase):
+
+ def setUp(self):
+ super().setUp()
+ self.server = LineProtocol(type(self).__name__ + '-server')
+
+ async def _asyncSetUp(self):
+ await super()._asyncSetUp()
+ await self._watch_runstates(*self.GOOD_CONNECTION_STATES)
+
+ async def _asyncTearDown(self):
+ await self.proto.disconnect()
+ try:
+ await self.server.disconnect()
+ except EOFError:
+ pass
+ await super()._asyncTearDown()
+
+ @TestBase.async_test
+ async def testSmoke(self):
+ with TemporaryDirectory(suffix='.aqmp') as tmpdir:
+ sock = os.path.join(tmpdir, type(self.proto).__name__ + ".sock")
+ server_task = create_task(self.server.accept(sock))
+
+ # give the server a chance to start listening [...]
+ await asyncio.sleep(0)
+ await self.proto.connect(sock)
diff --git a/python/tests/pylint.sh b/python/tests/pylint.sh
new file mode 100755
index 000000000..4b10b34db
--- /dev/null
+++ b/python/tests/pylint.sh
@@ -0,0 +1,2 @@
+#!/bin/sh -e
+python3 -m pylint qemu/