OSDN Git Service

python/aqmp: add __del__ method to legacy interface
[qmiga/qemu.git] / python / qemu / aqmp / legacy.py
1 """
2 Sync QMP Wrapper
3
4 This class pretends to be qemu.qmp.QEMUMonitorProtocol.
5 """
6
7 import asyncio
8 from typing import (
9     Awaitable,
10     List,
11     Optional,
12     TypeVar,
13     Union,
14 )
15
16 import qemu.qmp
17 from qemu.qmp import QMPMessage, QMPReturnValue, SocketAddrT
18
19 from .error import AQMPError
20 from .protocol import Runstate
21 from .qmp_client import QMPClient
22
23
24 # pylint: disable=missing-docstring
25
26
27 class QEMUMonitorProtocol(qemu.qmp.QEMUMonitorProtocol):
28     def __init__(self, address: SocketAddrT,
29                  server: bool = False,
30                  nickname: Optional[str] = None):
31
32         # pylint: disable=super-init-not-called
33         self._aqmp = QMPClient(nickname)
34         self._aloop = asyncio.get_event_loop()
35         self._address = address
36         self._timeout: Optional[float] = None
37
38     _T = TypeVar('_T')
39
40     def _sync(
41             self, future: Awaitable[_T], timeout: Optional[float] = None
42     ) -> _T:
43         return self._aloop.run_until_complete(
44             asyncio.wait_for(future, timeout=timeout)
45         )
46
47     def _get_greeting(self) -> Optional[QMPMessage]:
48         if self._aqmp.greeting is not None:
49             # pylint: disable=protected-access
50             return self._aqmp.greeting._asdict()
51         return None
52
53     # __enter__ and __exit__ need no changes
54     # parse_address needs no changes
55
56     def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
57         self._aqmp.await_greeting = negotiate
58         self._aqmp.negotiate = negotiate
59
60         self._sync(
61             self._aqmp.connect(self._address)
62         )
63         return self._get_greeting()
64
65     def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
66         self._aqmp.await_greeting = True
67         self._aqmp.negotiate = True
68
69         self._sync(
70             self._aqmp.accept(self._address),
71             timeout
72         )
73
74         ret = self._get_greeting()
75         assert ret is not None
76         return ret
77
78     def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
79         return dict(
80             self._sync(
81                 # pylint: disable=protected-access
82
83                 # _raw() isn't a public API, because turning off
84                 # automatic ID assignment is discouraged. For
85                 # compatibility with iotests *only*, do it anyway.
86                 self._aqmp._raw(qmp_cmd, assign_id=False),
87                 self._timeout
88             )
89         )
90
91     # Default impl of cmd() delegates to cmd_obj
92
93     def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
94         return self._sync(
95             self._aqmp.execute(cmd, kwds),
96             self._timeout
97         )
98
99     def pull_event(self,
100                    wait: Union[bool, float] = False) -> Optional[QMPMessage]:
101         if not wait:
102             # wait is False/0: "do not wait, do not except."
103             if self._aqmp.events.empty():
104                 return None
105
106         # If wait is 'True', wait forever. If wait is False/0, the events
107         # queue must not be empty; but it still needs some real amount
108         # of time to complete.
109         timeout = None
110         if wait and isinstance(wait, float):
111             timeout = wait
112
113         return dict(
114             self._sync(
115                 self._aqmp.events.get(),
116                 timeout
117             )
118         )
119
120     def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]:
121         events = [dict(x) for x in self._aqmp.events.clear()]
122         if events:
123             return events
124
125         event = self.pull_event(wait)
126         return [event] if event is not None else []
127
128     def clear_events(self) -> None:
129         self._aqmp.events.clear()
130
131     def close(self) -> None:
132         self._sync(
133             self._aqmp.disconnect()
134         )
135
136     def settimeout(self, timeout: Optional[float]) -> None:
137         self._timeout = timeout
138
139     def send_fd_scm(self, fd: int) -> None:
140         self._aqmp.send_fd_scm(fd)
141
142     def __del__(self) -> None:
143         if self._aqmp.runstate == Runstate.IDLE:
144             return
145
146         if not self._aloop.is_running():
147             self.close()
148         else:
149             # Garbage collection ran while the event loop was running.
150             # Nothing we can do about it now, but if we don't raise our
151             # own error, the user will be treated to a lot of traceback
152             # they might not understand.
153             raise AQMPError(
154                 "QEMUMonitorProtocol.close()"
155                 " was not called before object was garbage collected"
156             )