qemu_io_args_no_fmt += \
os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
-qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
+qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
+qemu_nbd_args = [qemu_nbd_prog]
if os.environ.get('QEMU_NBD_OPTIONS'):
qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
connect_stderr=False)
return returncode, output if returncode else ''
+def qemu_nbd_list_log(*args: str) -> str:
+ '''Run qemu-nbd to list remote exports'''
+ full_args = [qemu_nbd_prog, '-L'] + list(args)
+ output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
+ log(output, filters=[filter_testfiles, filter_nbd_exports])
+ return output
+
@contextmanager
def qemu_nbd_popen(*args):
'''Context manager running qemu-nbd within the context'''
return value
return filter_qmp(qmsg, _filter)
+def filter_nbd_exports(output: str) -> str:
+ return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
+
Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)