@@ -37,6 +37,10 @@ from qemu import qtest
assert sys.version_info >= (3, 6)
+# Type Aliases
+QMPResponse = Dict[str, Any]
+
+
faulthandler.enable()
# This will not work if arguments contain spaces but is necessary if we
@@ -541,25 +545,30 @@ class VM(qtest.QEMUQtestMachine):
self._args.append(addr)
return self
- def pause_drive(self, drive, event=None):
- '''Pause drive r/w operations'''
+ def hmp(self, command_line: str, use_log: bool = False) -> QMPResponse:
+ cmd = 'human-monitor-command'
+ kwargs = {'command-line': command_line}
+ if use_log:
+ return self.qmp_log(cmd, **kwargs)
+ else:
+ return self.qmp(cmd, **kwargs)
+
+ def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
+ """Pause drive r/w operations"""
if not event:
self.pause_drive(drive, "read_aio")
self.pause_drive(drive, "write_aio")
return
- self.qmp('human-monitor-command',
- command_line='qemu-io %s "break %s bp_%s"'
- % (drive, event, drive))
-
- def resume_drive(self, drive):
- self.qmp('human-monitor-command',
- command_line='qemu-io %s "remove_break bp_%s"'
- % (drive, drive))
-
- def hmp_qemu_io(self, drive, cmd):
- '''Write to a given drive using an HMP command'''
- return self.qmp('human-monitor-command',
- command_line='qemu-io %s "%s"' % (drive, cmd))
+ self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
+
+ def resume_drive(self, drive: str) -> None:
+ """Resume drive r/w operations"""
+ self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
+
+ def hmp_qemu_io(self, drive: str, cmd: str,
+ use_log: bool = False) -> QMPResponse:
+ """Write to a given drive using an HMP command"""
+ return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
def flatten_qmp_object(self, obj, output=None, basestr=''):
if output is None: