[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PATCH 2/2] QMP: add server mode to QEMUMonitorProtocol
From: |
Stefan Hajnoczi |
Subject: |
Re: [Qemu-devel] [PATCH 2/2] QMP: add server mode to QEMUMonitorProtocol |
Date: |
Thu, 26 May 2011 09:16:15 +0100 |
The test-stream.py script performs several automated tests of the image
streaming QMP interface, including exercising both the incremental and
background streaming modes.
This should probably be ported to KVM-Autotest rather than reinventing
the wheel.
Signed-off-by: Stefan Hajnoczi <address@hidden>
---
This is an example of how I am using qmp.py, especially the new
QEMUMonitorProtocol(server=True) and QEMUMonitorProtocol.get_events(wait=True)
arguments.
I am not submitting this for merge. Like the commit message says, this should
probably be ported to KVM-Autotest because it duplicates basic QEMU testing
infrastructure.
test-stream.py | 145 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 145 insertions(+), 0 deletions(-)
create mode 100644 test-stream.py
diff --git a/test-stream.py b/test-stream.py
new file mode 100644
index 0000000..eb9d9d6
--- /dev/null
+++ b/test-stream.py
@@ -0,0 +1,145 @@
+#!/usr/bin/env python
+import unittest
+import subprocess
+import re
+import os
+import sys; sys.path.append('QMP/')
+import qmp
+
+def qemu_img(*args):
+ devnull = open('/dev/null', 'r+')
+ return subprocess.call(['./qemu-img'] + list(args), stdin=devnull,
stdout=devnull)
+
+class VM(object):
+ def __init__(self):
+ self._monitor_path = '/tmp/qemu-mon.%d' % os.getpid()
+ self._qemu_log_path = '/tmp/qemu-log.%d' % os.getpid()
+ self._args = ['x86_64-softmmu/qemu-system-x86_64',
+ '-chardev', 'socket,id=mon,path=' + self._monitor_path,
+ '-mon', 'chardev=mon,mode=control',
+ '-nographic']
+ self._num_drives = 0
+
+ def add_drive(self, path):
+ self._args.append('-drive')
+ self._args.append('if=virtio,cache=none,file=%s,id=drive%d' % (path,
self._num_drives))
+ self._num_drives += 1
+ return self
+
+ def launch(self):
+ devnull = open('/dev/null', 'rb')
+ qemulog = open(self._qemu_log_path, 'wb')
+ self._qmp = qmp.QEMUMonitorProtocol(self._monitor_path, server=True)
+ self._popen = subprocess.Popen(self._args, stdin=devnull,
stdout=qemulog,
+ stderr=subprocess.STDOUT)
+ self._qmp.accept()
+
+ def shutdown(self):
+ self._qmp.cmd('quit')
+ self._popen.wait()
+ os.remove(self._monitor_path)
+ os.remove(self._qemu_log_path)
+
+ def qmp(self, cmd, **args):
+ return self._qmp.cmd(cmd, args=args)
+
+ def get_qmp_events(self, wait=False):
+ events = self._qmp.get_events(wait=wait)
+ self._qmp.clear_events()
+ return events
+
+index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
+
+class QMPTestCase(unittest.TestCase):
+ def dictpath(self, d, path):
+ """Traverse a path in a nested dict"""
+ for component in path.split('/'):
+ m = index_re.match(component)
+ if m:
+ component, idx = m.groups()
+ idx = int(idx)
+
+ if not isinstance(d, dict) or component not in d:
+ self.fail('failed path traversal for "%s" in "%s"' % (path,
str(d)))
+ d = d[component]
+
+ if m:
+ if not isinstance(d, list):
+ self.fail('path component "%s" in "%s" is not a list in
"%s"' % (component, path, str(d)))
+ try:
+ d = d[idx]
+ except IndexError:
+ self.fail('invalid index "%s" in path "%s" in "%s"' %
(idx, path, str(d)))
+ return d
+
+ def assert_qmp(self, d, path, value):
+ result = self.dictpath(d, path)
+ self.assertEqual(result, value, 'values not equal "%s" and "%s"' %
(str(result), str(value)))
+
+ def assert_no_active_streams(self):
+ result = self.vm.qmp('query-block-stream')
+ self.assert_qmp(result, 'return', [])
+
+class TestSingleDrive(QMPTestCase):
+ image_len = 1 * 1024 * 1024
+
+ def setUp(self):
+ qemu_img('create', 'backing.img', str(TestSingleDrive.image_len))
+ qemu_img('create', '-f', 'qed', '-o',
'backing_file=backing.img,copy_on_read=on', 'test.qed')
+ self.vm = VM().add_drive('test.qed')
+ self.vm.launch()
+
+ def tearDown(self):
+ self.vm.shutdown()
+ os.remove('test.qed')
+ os.remove('backing.img')
+
+ def test_stream_incremental(self):
+ self.assert_no_active_streams()
+
+ completed = False
+ while not completed:
+ result = self.vm.qmp('block_stream', device='drive0')
+ self.assert_qmp(result, 'return/device', 'drive0')
+ self.assert_qmp(result, 'return/len', self.image_len)
+
+ for event in self.vm.get_qmp_events():
+ if event['event'] == 'BLOCK_STREAM_COMPLETED':
+ self.assert_qmp(event, 'data/device', 'drive0')
+ self.assert_qmp(event, 'data/offset', self.image_len)
+ self.assert_qmp(event, 'data/len', self.image_len)
+ completed = True
+
+ # Compare result against query-block-stream output
+ if not completed:
+ query = self.vm.qmp('query-block-stream')
+ self.assert_qmp(query, 'return[0]/device', 'drive0')
+ self.assert_qmp(query, 'return[0]/offset',
+ self.dictpath(result, 'return/offset'))
+ self.assert_qmp(query, 'return[0]/len', self.image_len)
+
+ self.assert_no_active_streams()
+
+ def test_stream_all(self):
+ self.assert_no_active_streams()
+
+ result = self.vm.qmp('block_stream', device='drive0', all=True)
+ self.assert_qmp(result, 'return', {})
+
+ completed = False
+ while not completed:
+ for event in self.vm.get_qmp_events(wait=True):
+ if event['event'] == 'BLOCK_STREAM_COMPLETED':
+ self.assert_qmp(event, 'data/device', 'drive0')
+ self.assert_qmp(event, 'data/offset', self.image_len)
+ self.assert_qmp(event, 'data/len', self.image_len)
+ completed = True
+
+ self.assert_no_active_streams()
+
+ def test_device_not_found(self):
+ result = self.vm.qmp('block_stream', device='nonexistent')
+ self.assert_qmp(result, 'error/class', 'DeviceNotFound')
+
+if __name__ == '__main__':
+ unittest.main()
--
1.7.4.4