qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [PATCH] qemu/machine: add asyncio process-management demo


From: John Snow
Subject: Re: [PATCH] qemu/machine: add asyncio process-management demo
Date: Fri, 2 Dec 2022 19:20:09 -0500

On Fri, Dec 2, 2022 at 3:57 PM John Snow <jsnow@redhat.com> wrote:
>
> This is just an RFC to show a technique for how to watch terminal input
> using asyncio. This is just a *demo* and elides many things for
> simplicity; namely I don't create a QMP monitor nor a guest console
> socket.
>
> What I really wanted feedback on was an API for consuming information
> from an async stream. In this demo, that's QEMU's terminal output
> (stdout, stderr -- usually pretty minimal information in most
> circumstances except in error cases) but the same techniques may apply
> to guest console output, too.
>
> Please see comments and docstrings inline for RFC questions and other
> observations.
>
> To test it, navigate to qemu.git/python, and then invoke e.g.;
>
> > python3 -m qemu.machine.demo /usr/bin/qemu-system-x86_64 --help
>
> (Note, this demo requires Python 3.7+; not as a fundamental necessity,
> but 3.6 requires some ifdef style tricks to run that I didn't bother to
> code in for a quick demo. Sorry.)
>
> or try it with a bad flag on purpose (--frobnozz), no flag at all, or
> try with *any* executable of your choice to see how stdout/stderr, return
> codes and newline terminations are handled.
>
> Also try:
>
> > python3 -m qemu.machine.demo /usr/bin/echo -n "no newline?"
> > python3 -m qemu.machine.demo /usr/bin/yes "I hate the antichrist"

*cough* this is a bit of an inside joke that I forgot to edit out of my draft.
Context is https://i.kym-cdn.com/photos/images/original/002/368/100/427
(Does that help? No? Oh well.)

>
> One huge caveat:
>
> I would *really have liked* to offer an interface that's exactly
> equivalent to asyncio.StreamReader() that would allow a user to
> asynchronously *consume* data using the same API that Python
> offers. However, I could not find a way to provide a kind of
> "pass-through" that also provided logging, logging-to-file, in-memory
> buffering, etc.
>
> I briefly experimented with using an actual pipe via os.pipe() for which
> I could asynchronously read and write, but the overhead of asyncio
> machinery here felt a bit high and perhaps simultaneously overkill and
> porcelain. It's possible I'm missing an easier way to provide this type
> of feature, but it seems like the sort of thing we might want for
> e.g. asynchronously watching for lines from a guest console.
>
> I'm not sure if it will be possible to add. If there's interest and it
> seems worth pursuing, I'll try to push on it. Otherwise, maybe what we
> have here is "good enough"?
>
> *shrug*
>
> Anyway, thanks!
>
> Signed-off-by: John Snow <jsnow@redhat.com>
> ---
>  python/qemu/machine/demo.py | 276 ++++++++++++++++++++++++++++++++++++
>  1 file changed, 276 insertions(+)
>  create mode 100644 python/qemu/machine/demo.py
>
> diff --git a/python/qemu/machine/demo.py b/python/qemu/machine/demo.py
> new file mode 100644
> index 00000000000..329c2fc5532
> --- /dev/null
> +++ b/python/qemu/machine/demo.py
> @@ -0,0 +1,276 @@
> +"""
> +This module is a quick demonstration to show how to asynchronously
> +watch terminal output using Python asyncio, and how it might relate to
> +improving QEMUMachine.
> +
> +This demo does NOT include console output nor a QMP monitor, but the
> +techniques being applied here might be applied to guest console
> +interactions.
> +"""
> +
> +import asyncio
> +import io
> +import locale
> +import logging
> +import os
> +from pathlib import Path
> +import resource
> +import sys
> +from typing import (
> +    Any,
> +    BinaryIO,
> +    List,
> +    Optional,
> +    Union,
> +)
> +
> +
> +class StreamWatcher:
> +    """
> +    StreamWatcher is a bit of a quick hack that consumes data (as
> +    bytes) from an asyncio.StreamReader and relays it to several other
> +    sources concurrently.
> +
> +    Conceptually, it's kind of like "tee": data from one pipe is
> +    forwarded to several.
> +
> +    A single instance of this class watches either stdout or
> +    stderr. (A user could combine these streams and then a single
> +    instance of this class could watch both.)
> +
> +    The data stream being watched is forwarded to three destinations:
> +
> +    (1) BytesIO (In-memory buffer)
> +        --------------------------
> +
> +        The data is buffered directly into a BytesIO object. This isn't
> +        used to do anything further, but a caller could get the entire
> +        stream (so far) at any time. This mimics the "iolog" property of
> +        QEMUMachine, which we have used in the past to show the console
> +        output on various error conditions or to assert that certain
> +        patterns have occurred in iotests.
> +
> +        This can grow without bound, which might be a bad idea. It seems
> +        convenient to have on by default, but for more serious uses, you
> +        may want to actually disable this. Maybe it'd be useful to be
> +        able to configure which sources you want active by default.
> +
> +    (2) External logfile
> +        ----------------
> +
> +        Data is written byte-for-byte into an external logfile. In this
> +        demo, StreamWatcher does not own the file object so that two
> +        StreamWatchers can share the same logfile -- so that a
> +        stderr-watcher and a stdout-watcher can log to the same file.
> +
> +        This destination adds the "logfile" and "flush" parameters to
> +        the initializer; flush=True can be used for the stderr-watcher
> +        if desired to flush the file to disk without waiting for a
> +        newline.
> +
> +        This might be a bit extraneous since we already have an
> +        in-memory log, but I added it here purely because if all else
> +        fails -- if we don't ever print out the in-memory buffer and we
> +        don't enable logging -- we can likely rely on a good
> +        old-fashioned solid bohr-model file.
> +
> +    (3) Python Logging Interface
> +        ------------------------
> +
> +        Data is manually re-buffered and when a newline is encountered,
> +        the buffer is flushed into the Python logging subsystem. This
> +        may mean that if terminal output is not terminated with a
> +        newline, we may hang onto it in the buffer. When EOF is
> +        encountered, any remaining information in the buffer is flushed
> +        with a newline marker inserted to indicate that a newline was
> +        not actually seen. (This is how FiSH seems to handle it, and I
> +        like it.)
> +
> +        This destination adds two more parameters: logger and level. The
> +        logger is the Logger instance to log to, while the level
> +        determines which logging level to use for messages in this
> +        stream. In this demo, I use "INFO" for stdout and "WARNING" for
> +        stderr. If the Python logging subsystem is not configured, the
> +        default behavior is to hide "INFO" messages but to show
> +        "WARNING" messages. This might be the most useful behavior for
> +        helping to surface potential errors, but it's possible it will
> +        be a pain for certain kinds of iotesting.
> +    """
> +    # pylint: disable=too-few-public-methods
> +    def __init__(
> +            self,
> +            pipe: asyncio.StreamReader,
> +            logger: logging.Logger,
> +            level: int,
> +            logfile: BinaryIO,
> +            flush: bool = False):
> +        self.pipe = pipe
> +        self.logfile = logfile
> +        self.logger = logger
> +        self.level = level
> +        self.flush = flush
> +
> +        self.data = io.BytesIO()
> +        self.buffer = bytearray()
> +
> +        # We need an encoding for whatever we're watching.
> +        # For console output, assume it's whatever our locale says.
> +        # If this guess is wrong, go ahead and change it, pal.
> +        _, encoding = locale.getlocale()
> +        self.encoding = encoding or 'UTF-8'
> +
> +    async def run(self) -> None:
> +        """
> +        Run forever, waiting for new data.
> +
> +        When the stream hits EOF, return.
> +        """
> +        self.logger.debug("StreamWatcher starting")
> +        pagesize = resource.getpagesize()
> +        while True:
> +            data = await self.pipe.read(pagesize)
> +            await self._handle_data(data)
> +            if not data:
> +                break
> +        self.logger.debug("StreamWatcher exiting")
> +
> +    async def _handle_data(self, data: bytes) -> None:
> +        # Destination A: Internal line-based buffer
> +        await self._buffer_data(data)
> +
> +        # Destination B: Internal byte-based log
> +        self.data.write(data)
> +
> +        # Destination C: External logfile
> +        self.logfile.write(data)
> +        if self.flush:
> +            self.logfile.flush()
> +
> +    async def _buffer_data(self, data: bytes) -> None:
> +        self.buffer.extend(data)
> +        if not self.buffer:
> +            return
> +
> +        lines = self.buffer.split(b'\n')
> +        if lines[-1]:  # trailing line was not (yet) terminated
> +            self.buffer = lines[-1]
> +        else:
> +            self.buffer.clear()
> +
> +        for line in lines[:-1]:
> +            await self._handle_line(
> +                line.decode(self.encoding, errors='replace'))
> +
> +        if data == b'' and self.buffer:
> +            # EOL; flush the remainder of the buffer.
> +            await self._handle_line(
> +                self.buffer.decode(self.encoding, errors='replace') + '⏎')
> +
> +    async def _handle_line(self, line: str) -> None:
> +        self.logger.log(self.level, line)
> +
> +
> +class ExecManager:
> +    """
> +    Simple demo for executing a child process while gathering its output.
> +    """
> +    logger = logging.getLogger(__name__)
> +
> +    def __init__(self) -> None:
> +        self.process: Optional[asyncio.subprocess.Process] = None
> +        self.log: Optional[BinaryIO] = None
> +        self.stdout: Optional[StreamWatcher] = None
> +        self.stderr: Optional[StreamWatcher] = None
> +        self._tasks: List[asyncio.Task[Any]] = []
> +
> +    async def launch(self, binary: Union[str, Path], *args: str) -> None:
> +        """Launch the executable, but don't wait for it."""
> +        self.logger.debug("launching '%s'", binary)
> +        self.logger.debug("%s", ' '.join((str(binary),) + args))
> +        self.process = await asyncio.create_subprocess_exec(
> +            str(binary),
> +            *args,
> +            stdin=asyncio.subprocess.DEVNULL,
> +            stdout=asyncio.subprocess.PIPE,
> +            stderr=asyncio.subprocess.PIPE,
> +        )
> +        # Type hints for mypy
> +        assert self.process.stdout is not None
> +        assert self.process.stderr is not None
> +
> +        self.log = open("qemu.log", "wb")
> +        self.stdout = StreamWatcher(
> +            self.process.stdout,
> +            self.logger.getChild('stdout'),
> +            logging.INFO,
> +            self.log)
> +        self.stderr = StreamWatcher(
> +            self.process.stderr,
> +            self.logger.getChild('stderr'),
> +            logging.WARNING,
> +            self.log,
> +            flush=True)
> +        self._tasks.append(asyncio.create_task(self.stdout.run()))
> +        self._tasks.append(asyncio.create_task(self.stderr.run()))
> +
> +    async def wait(self) -> None:
> +        """Wait for the process and all watchers to finish."""
> +        if self.process is None:
> +            raise Exception("Nothing's running, pal!")
> +        self.logger.debug("bundling reader tasks and process waiter ...")
> +        task = asyncio.gather(
> +            *self._tasks,
> +            self.process.wait(),
> +            return_exceptions=True,
> +        )
> +        # return_exceptions=True means that if any coroutine raises an
> +        # exception, all other coroutines will be cancelled and waited on.
> +        # Without this, the other coroutines continue to run after first
> +        # exception.
> +        self.logger.debug("waiting on bundled task ...")
> +        await task
> +        self.logger.debug("bundled task done.")
> +        if self.log is not None:
> +            self.logger.debug("closing logfile")
> +            self.log.close()
> +            if self.process.returncode == 0:
> +                self.logger.debug("No errors detected; deleting qemu.log")
> +                os.unlink("qemu.log")
> +            else:
> +                self.logger.debug(
> +                    "Process returned non-zero returncode, keeping qemu.log")
> +            self.log = None
> +
> +
> +async def main(binary: str, *args: str) -> int:
> +    """Run a subprocess, print out some stuff, have a good time."""
> +    logging.basicConfig(level=logging.DEBUG)
> +    proc = ExecManager()
> +    await proc.launch(binary, *args)
> +    assert proc.stdout is not None
> +    assert proc.stderr is not None
> +    logging.debug("process launched; waiting on termination")
> +    await proc.wait()
> +    logging.debug("process terminated.")
> +
> +    stdout = proc.stdout.data.getvalue()
> +    if stdout:
> +        print("========== stdout ==========")
> +        print(stdout.decode(proc.stdout.encoding), end='')
> +        print("=" * 80)
> +
> +    stderr = proc.stderr.data.getvalue()
> +    if stderr:
> +        print("========== stderr ==========")
> +        print(stderr.decode(proc.stderr.encoding), end='')
> +        print("=" * 80)
> +
> +    assert proc.process is not None
> +    assert proc.process.returncode is not None
> +    print(f"process returncode was {proc.process.returncode}")
> +    print("OK, seeya!")
> +    return proc.process.returncode
> +
> +
> +if __name__ == '__main__':
> +    sys.exit(asyncio.run(main(*sys.argv[1:])))
> --
> 2.38.1
>




reply via email to

[Prev in Thread] Current Thread [Next in Thread]