Run a tool and return both its output and its exit code
"""
stderr = subprocess.STDOUT if connect_stderr else None
- subp = subprocess.Popen(args,
- stdout=subprocess.PIPE,
- stderr=stderr,
- universal_newlines=True)
- output = subp.communicate()[0]
- if subp.returncode < 0:
- cmd = ' '.join(args)
- sys.stderr.write(f'{tool} received signal {-subp.returncode}: {cmd}\n')
- return (output, subp.returncode)
+ with subprocess.Popen(args, stdout=subprocess.PIPE,
+ stderr=stderr, universal_newlines=True) as subp:
+ output = subp.communicate()[0]
+ if subp.returncode < 0:
+ cmd = ' '.join(args)
+ sys.stderr.write(f'{tool} received signal \
+ {-subp.returncode}: {cmd}\n')
+ return (output, subp.returncode)
def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
"""
class QemuIoInteractive:
def __init__(self, *args):
self.args = qemu_io_args_no_fmt + list(args)
+ # We need to keep the Popen objext around, and not
+ # close it immediately. Therefore, disable the pylint check:
+ # pylint: disable=consider-using-with
self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
cmd.extend(args)
log('Start NBD server')
- p = subprocess.Popen(cmd)
- try:
- while not os.path.exists(pid_file):
- if p.poll() is not None:
- raise RuntimeError(
- "qemu-nbd terminated with exit code {}: {}"
- .format(p.returncode, ' '.join(cmd)))
-
- time.sleep(0.01)
- yield
- finally:
- if os.path.exists(pid_file):
- os.remove(pid_file)
- log('Kill NBD server')
- p.kill()
- p.wait()
+ with subprocess.Popen(cmd) as p:
+ try:
+ while not os.path.exists(pid_file):
+ if p.poll() is not None:
+ raise RuntimeError(
+ "qemu-nbd terminated with exit code {}: {}"
+ .format(p.returncode, ' '.join(cmd)))
+
+ time.sleep(0.01)
+ yield
+ finally:
+ if os.path.exists(pid_file):
+ os.remove(pid_file)
+ log('Kill NBD server')
+ p.kill()
+ p.wait()
def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
'''Return True if two image files are identical'''
def create_image(name, size):
'''Create a fully-allocated raw image with sector markers'''
- file = open(name, 'wb')
- i = 0
- while i < size:
- sector = struct.pack('>l504xl', i // 512, i // 512)
- file.write(sector)
- i = i + 512
- file.close()
+ with open(name, 'wb') as file:
+ i = 0
+ while i < size:
+ sector = struct.pack('>l504xl', i // 512, i // 512)
+ file.write(sector)
+ i = i + 512
def image_size(img):
'''Return image's virtual size'''
t0 = time.time()
with f_bad.open('w', encoding="utf-8") as f:
- proc = subprocess.Popen(args, cwd=str(f_test.parent), env=env,
- stdout=f, stderr=subprocess.STDOUT)
- try:
- proc.wait()
- except KeyboardInterrupt:
- proc.terminate()
- proc.wait()
- return TestResult(status='not run',
- description='Interrupted by user',
- interrupted=True)
- ret = proc.returncode
+ with subprocess.Popen(args, cwd=str(f_test.parent), env=env,
+ stdout=f, stderr=subprocess.STDOUT) as proc:
+ try:
+ proc.wait()
+ except KeyboardInterrupt:
+ proc.terminate()
+ proc.wait()
+ return TestResult(status='not run',
+ description='Interrupted by user',
+ interrupted=True)
+ ret = proc.returncode
elapsed = round(time.time() - t0, 1)