4 This class pretends to be qemu.qmp.QEMUMonitorProtocol.
17 from qemu.qmp import QMPMessage, QMPReturnValue, SocketAddrT
19 from .error import AQMPError
20 from .protocol import Runstate
21 from .qmp_client import QMPClient
24 # pylint: disable=missing-docstring
27 class QEMUMonitorProtocol(qemu.qmp.QEMUMonitorProtocol):
28 def __init__(self, address: SocketAddrT,
30 nickname: Optional[str] = None):
32 # pylint: disable=super-init-not-called
33 self._aqmp = QMPClient(nickname)
34 self._aloop = asyncio.get_event_loop()
35 self._address = address
36 self._timeout: Optional[float] = None
41 self, future: Awaitable[_T], timeout: Optional[float] = None
43 return self._aloop.run_until_complete(
44 asyncio.wait_for(future, timeout=timeout)
47 def _get_greeting(self) -> Optional[QMPMessage]:
48 if self._aqmp.greeting is not None:
49 # pylint: disable=protected-access
50 return self._aqmp.greeting._asdict()
53 # __enter__ and __exit__ need no changes
54 # parse_address needs no changes
56 def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
57 self._aqmp.await_greeting = negotiate
58 self._aqmp.negotiate = negotiate
61 self._aqmp.connect(self._address)
63 return self._get_greeting()
65 def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
66 self._aqmp.await_greeting = True
67 self._aqmp.negotiate = True
70 self._aqmp.accept(self._address),
74 ret = self._get_greeting()
75 assert ret is not None
78 def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
81 # pylint: disable=protected-access
83 # _raw() isn't a public API, because turning off
84 # automatic ID assignment is discouraged. For
85 # compatibility with iotests *only*, do it anyway.
86 self._aqmp._raw(qmp_cmd, assign_id=False),
91 # Default impl of cmd() delegates to cmd_obj
93 def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
95 self._aqmp.execute(cmd, kwds),
100 wait: Union[bool, float] = False) -> Optional[QMPMessage]:
102 # wait is False/0: "do not wait, do not except."
103 if self._aqmp.events.empty():
106 # If wait is 'True', wait forever. If wait is False/0, the events
107 # queue must not be empty; but it still needs some real amount
108 # of time to complete.
110 if wait and isinstance(wait, float):
115 self._aqmp.events.get(),
120 def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]:
121 events = [dict(x) for x in self._aqmp.events.clear()]
125 event = self.pull_event(wait)
126 return [event] if event is not None else []
128 def clear_events(self) -> None:
129 self._aqmp.events.clear()
131 def close(self) -> None:
133 self._aqmp.disconnect()
136 def settimeout(self, timeout: Optional[float]) -> None:
137 self._timeout = timeout
139 def send_fd_scm(self, fd: int) -> None:
140 self._aqmp.send_fd_scm(fd)
142 def __del__(self) -> None:
143 if self._aqmp.runstate == Runstate.IDLE:
146 if not self._aloop.is_running():
149 # Garbage collection ran while the event loop was running.
150 # Nothing we can do about it now, but if we don't raise our
151 # own error, the user will be treated to a lot of traceback
152 # they might not understand.
154 "QEMUMonitorProtocol.close()"
155 " was not called before object was garbage collected"