[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [PATCH] qemu/machine: add asyncio process-management demo
From: |
John Snow |
Subject: |
Re: [PATCH] qemu/machine: add asyncio process-management demo |
Date: |
Fri, 2 Dec 2022 19:20:09 -0500 |
On Fri, Dec 2, 2022 at 3:57 PM John Snow <jsnow@redhat.com> wrote:
>
> This is just an RFC to show a technique for how to watch terminal input
> using asyncio. This is just a *demo* and elides many things for
> simplicity; namely I don't create a QMP monitor nor a guest console
> socket.
>
> What I really wanted feedback on was an API for consuming information
> from an async stream. In this demo, that's QEMU's terminal output
> (stdout, stderr -- usually pretty minimal information in most
> circumstances except in error cases) but the same techniques may apply
> to guest console output, too.
>
> Please see comments and docstrings inline for RFC questions and other
> observations.
>
> To test it, navigate to qemu.git/python, and then invoke e.g.;
>
> > python3 -m qemu.machine.demo /usr/bin/qemu-system-x86_64 --help
>
> (Note, this demo requires Python 3.7+; not as a fundamental necessity,
> but 3.6 requires some ifdef style tricks to run that I didn't bother to
> code in for a quick demo. Sorry.)
>
> or try it with a bad flag on purpose (--frobnozz), no flag at all, or
> try with *any* executable of your choice to see how stdout/stderr, return
> codes and newline terminations are handled.
>
> Also try:
>
> > python3 -m qemu.machine.demo /usr/bin/echo -n "no newline?"
> > python3 -m qemu.machine.demo /usr/bin/yes "I hate the antichrist"
*cough* this is a bit of an inside joke that I forgot to edit out of my draft.
Context is https://i.kym-cdn.com/photos/images/original/002/368/100/427
(Does that help? No? Oh well.)
>
> One huge caveat:
>
> I would *really have liked* to offer an interface that's exactly
> equivalent to asyncio.StreamReader() that would allow a user to
> asynchronously *consume* data using the same API that Python
> offers. However, I could not find a way to provide a kind of
> "pass-through" that also provided logging, logging-to-file, in-memory
> buffering, etc.
>
> I briefly experimented with using an actual pipe via os.pipe() for which
> I could asynchronously read and write, but the overhead of asyncio
> machinery here felt a bit high and perhaps simultaneously overkill and
> porcelain. It's possible I'm missing an easier way to provide this type
> of feature, but it seems like the sort of thing we might want for
> e.g. asynchronously watching for lines from a guest console.
>
> I'm not sure if it will be possible to add. If there's interest and it
> seems worth pursuing, I'll try to push on it. Otherwise, maybe what we
> have here is "good enough"?
>
> *shrug*
>
> Anyway, thanks!
>
> Signed-off-by: John Snow <jsnow@redhat.com>
> ---
> python/qemu/machine/demo.py | 276 ++++++++++++++++++++++++++++++++++++
> 1 file changed, 276 insertions(+)
> create mode 100644 python/qemu/machine/demo.py
>
> diff --git a/python/qemu/machine/demo.py b/python/qemu/machine/demo.py
> new file mode 100644
> index 00000000000..329c2fc5532
> --- /dev/null
> +++ b/python/qemu/machine/demo.py
> @@ -0,0 +1,276 @@
> +"""
> +This module is a quick demonstration to show how to asynchronously
> +watch terminal output using Python asyncio, and how it might relate to
> +improving QEMUMachine.
> +
> +This demo does NOT include console output nor a QMP monitor, but the
> +techniques being applied here might be applied to guest console
> +interactions.
> +"""
> +
> +import asyncio
> +import io
> +import locale
> +import logging
> +import os
> +from pathlib import Path
> +import resource
> +import sys
> +from typing import (
> + Any,
> + BinaryIO,
> + List,
> + Optional,
> + Union,
> +)
> +
> +
> +class StreamWatcher:
> + """
> + StreamWatcher is a bit of a quick hack that consumes data (as
> + bytes) from an asyncio.StreamReader and relays it to several other
> + sources concurrently.
> +
> + Conceptually, it's kind of like "tee": data from one pipe is
> + forwarded to several.
> +
> + A single instance of this class watches either stdout or
> + stderr. (A user could combine these streams and then a single
> + instance of this class could watch both.)
> +
> + The data stream being watched is forwarded to three destinations:
> +
> + (1) BytesIO (In-memory buffer)
> + --------------------------
> +
> + The data is buffered directly into a BytesIO object. This isn't
> + used to do anything further, but a caller could get the entire
> + stream (so far) at any time. This mimics the "iolog" property of
> + QEMUMachine, which we have used in the past to show the console
> + output on various error conditions or to assert that certain
> + patterns have occurred in iotests.
> +
> + This can grow without bound, which might be a bad idea. It seems
> + convenient to have on by default, but for more serious uses, you
> + may want to actually disable this. Maybe it'd be useful to be
> + able to configure which sources you want active by default.
> +
> + (2) External logfile
> + ----------------
> +
> + Data is written byte-for-byte into an external logfile. In this
> + demo, StreamWatcher does not own the file object so that two
> + StreamWatchers can share the same logfile -- so that a
> + stderr-watcher and a stdout-watcher can log to the same file.
> +
> + This destination adds the "logfile" and "flush" parameters to
> + the initializer; flush=True can be used for the stderr-watcher
> + if desired to flush the file to disk without waiting for a
> + newline.
> +
> + This might be a bit extraneous since we already have an
> + in-memory log, but I added it here purely because if all else
> + fails -- if we don't ever print out the in-memory buffer and we
> + don't enable logging -- we can likely rely on a good
> + old-fashioned solid bohr-model file.
> +
> + (3) Python Logging Interface
> + ------------------------
> +
> + Data is manually re-buffered and when a newline is encountered,
> + the buffer is flushed into the Python logging subsystem. This
> + may mean that if terminal output is not terminated with a
> + newline, we may hang onto it in the buffer. When EOF is
> + encountered, any remaining information in the buffer is flushed
> + with a newline marker inserted to indicate that a newline was
> + not actually seen. (This is how FiSH seems to handle it, and I
> + like it.)
> +
> + This destination adds two more parameters: logger and level. The
> + logger is the Logger instance to log to, while the level
> + determines which logging level to use for messages in this
> + stream. In this demo, I use "INFO" for stdout and "WARNING" for
> + stderr. If the Python logging subsystem is not configured, the
> + default behavior is to hide "INFO" messages but to show
> + "WARNING" messages. This might be the most useful behavior for
> + helping to surface potential errors, but it's possible it will
> + be a pain for certain kinds of iotesting.
> + """
> + # pylint: disable=too-few-public-methods
> + def __init__(
> + self,
> + pipe: asyncio.StreamReader,
> + logger: logging.Logger,
> + level: int,
> + logfile: BinaryIO,
> + flush: bool = False):
> + self.pipe = pipe
> + self.logfile = logfile
> + self.logger = logger
> + self.level = level
> + self.flush = flush
> +
> + self.data = io.BytesIO()
> + self.buffer = bytearray()
> +
> + # We need an encoding for whatever we're watching.
> + # For console output, assume it's whatever our locale says.
> + # If this guess is wrong, go ahead and change it, pal.
> + _, encoding = locale.getlocale()
> + self.encoding = encoding or 'UTF-8'
> +
> + async def run(self) -> None:
> + """
> + Run forever, waiting for new data.
> +
> + When the stream hits EOF, return.
> + """
> + self.logger.debug("StreamWatcher starting")
> + pagesize = resource.getpagesize()
> + while True:
> + data = await self.pipe.read(pagesize)
> + await self._handle_data(data)
> + if not data:
> + break
> + self.logger.debug("StreamWatcher exiting")
> +
> + async def _handle_data(self, data: bytes) -> None:
> + # Destination A: Internal line-based buffer
> + await self._buffer_data(data)
> +
> + # Destination B: Internal byte-based log
> + self.data.write(data)
> +
> + # Destination C: External logfile
> + self.logfile.write(data)
> + if self.flush:
> + self.logfile.flush()
> +
> + async def _buffer_data(self, data: bytes) -> None:
> + self.buffer.extend(data)
> + if not self.buffer:
> + return
> +
> + lines = self.buffer.split(b'\n')
> + if lines[-1]: # trailing line was not (yet) terminated
> + self.buffer = lines[-1]
> + else:
> + self.buffer.clear()
> +
> + for line in lines[:-1]:
> + await self._handle_line(
> + line.decode(self.encoding, errors='replace'))
> +
> + if data == b'' and self.buffer:
> + # EOL; flush the remainder of the buffer.
> + await self._handle_line(
> + self.buffer.decode(self.encoding, errors='replace') + '⏎')
> +
> + async def _handle_line(self, line: str) -> None:
> + self.logger.log(self.level, line)
> +
> +
> +class ExecManager:
> + """
> + Simple demo for executing a child process while gathering its output.
> + """
> + logger = logging.getLogger(__name__)
> +
> + def __init__(self) -> None:
> + self.process: Optional[asyncio.subprocess.Process] = None
> + self.log: Optional[BinaryIO] = None
> + self.stdout: Optional[StreamWatcher] = None
> + self.stderr: Optional[StreamWatcher] = None
> + self._tasks: List[asyncio.Task[Any]] = []
> +
> + async def launch(self, binary: Union[str, Path], *args: str) -> None:
> + """Launch the executable, but don't wait for it."""
> + self.logger.debug("launching '%s'", binary)
> + self.logger.debug("%s", ' '.join((str(binary),) + args))
> + self.process = await asyncio.create_subprocess_exec(
> + str(binary),
> + *args,
> + stdin=asyncio.subprocess.DEVNULL,
> + stdout=asyncio.subprocess.PIPE,
> + stderr=asyncio.subprocess.PIPE,
> + )
> + # Type hints for mypy
> + assert self.process.stdout is not None
> + assert self.process.stderr is not None
> +
> + self.log = open("qemu.log", "wb")
> + self.stdout = StreamWatcher(
> + self.process.stdout,
> + self.logger.getChild('stdout'),
> + logging.INFO,
> + self.log)
> + self.stderr = StreamWatcher(
> + self.process.stderr,
> + self.logger.getChild('stderr'),
> + logging.WARNING,
> + self.log,
> + flush=True)
> + self._tasks.append(asyncio.create_task(self.stdout.run()))
> + self._tasks.append(asyncio.create_task(self.stderr.run()))
> +
> + async def wait(self) -> None:
> + """Wait for the process and all watchers to finish."""
> + if self.process is None:
> + raise Exception("Nothing's running, pal!")
> + self.logger.debug("bundling reader tasks and process waiter ...")
> + task = asyncio.gather(
> + *self._tasks,
> + self.process.wait(),
> + return_exceptions=True,
> + )
> + # return_exceptions=True means that if any coroutine raises an
> + # exception, all other coroutines will be cancelled and waited on.
> + # Without this, the other coroutines continue to run after first
> + # exception.
> + self.logger.debug("waiting on bundled task ...")
> + await task
> + self.logger.debug("bundled task done.")
> + if self.log is not None:
> + self.logger.debug("closing logfile")
> + self.log.close()
> + if self.process.returncode == 0:
> + self.logger.debug("No errors detected; deleting qemu.log")
> + os.unlink("qemu.log")
> + else:
> + self.logger.debug(
> + "Process returned non-zero returncode, keeping qemu.log")
> + self.log = None
> +
> +
> +async def main(binary: str, *args: str) -> int:
> + """Run a subprocess, print out some stuff, have a good time."""
> + logging.basicConfig(level=logging.DEBUG)
> + proc = ExecManager()
> + await proc.launch(binary, *args)
> + assert proc.stdout is not None
> + assert proc.stderr is not None
> + logging.debug("process launched; waiting on termination")
> + await proc.wait()
> + logging.debug("process terminated.")
> +
> + stdout = proc.stdout.data.getvalue()
> + if stdout:
> + print("========== stdout ==========")
> + print(stdout.decode(proc.stdout.encoding), end='')
> + print("=" * 80)
> +
> + stderr = proc.stderr.data.getvalue()
> + if stderr:
> + print("========== stderr ==========")
> + print(stderr.decode(proc.stderr.encoding), end='')
> + print("=" * 80)
> +
> + assert proc.process is not None
> + assert proc.process.returncode is not None
> + print(f"process returncode was {proc.process.returncode}")
> + print("OK, seeya!")
> + return proc.process.returncode
> +
> +
> +if __name__ == '__main__':
> + sys.exit(asyncio.run(main(*sys.argv[1:])))
> --
> 2.38.1
>