QMPResponse = Dict[str, Any]
+# Use this logger for logging messages directly from the iotests module
+logger = logging.getLogger('qemu.iotests')
+logger.addHandler(logging.NullHandler())
+
+# Use this logger for messages that ought to be used for diff output.
+test_logger = logging.getLogger('qemu.iotests.diff_io')
+
+
faulthandler.enable()
# This will not work if arguments contain spaces but is necessary if we
if isinstance(msg, (dict, list)):
# Don't sort if it's already sorted
do_sort = not isinstance(msg, OrderedDict)
- print(json.dumps(msg, sort_keys=do_sort, indent=indent))
+ test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
else:
- print(msg)
+ test_logger.info(msg)
class Timeout:
def __init__(self, seconds, errmsg="Timeout"):
# Returns None on success, and an error string on failure
def run_job(self, job, auto_finalize=True, auto_dismiss=False,
- pre_finalize=None, cancel=False, use_log=True, wait=60.0):
+ pre_finalize=None, cancel=False, wait=60.0):
"""
run_job moves a job from creation through to dismissal.
invoked prior to issuing job-finalize, if any.
:param cancel: Bool. When true, cancels the job after the pre_finalize
callback.
- :param use_log: Bool. When false, does not log QMP messages.
:param wait: Float. Timeout value specifying how long to wait for any
event, in seconds. Defaults to 60.0.
"""
while True:
ev = filter_qmp_event(self.events_wait(events, timeout=wait))
if ev['event'] != 'JOB_STATUS_CHANGE':
- if use_log:
- log(ev)
+ log(ev)
continue
status = ev['data']['status']
if status == 'aborting':
for j in result['return']:
if j['id'] == job:
error = j['error']
- if use_log:
- log('Job failed: %s' % (j['error']))
+ log('Job failed: %s' % (j['error']))
elif status == 'ready':
- if use_log:
- self.qmp_log('job-complete', id=job)
- else:
- self.qmp('job-complete', id=job)
+ self.qmp_log('job-complete', id=job)
elif status == 'pending' and not auto_finalize:
if pre_finalize:
pre_finalize()
- if cancel and use_log:
+ if cancel:
self.qmp_log('job-cancel', id=job)
- elif cancel:
- self.qmp('job-cancel', id=job)
- elif use_log:
- self.qmp_log('job-finalize', id=job)
else:
- self.qmp('job-finalize', id=job)
+ self.qmp_log('job-finalize', id=job)
elif status == 'concluded' and not auto_dismiss:
- if use_log:
- self.qmp_log('job-dismiss', id=job)
- else:
- self.qmp('job-dismiss', id=job)
+ self.qmp_log('job-dismiss', id=job)
elif status == 'null':
return error
seq = os.path.basename(sys.argv[0])
open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
- print('%s not run: %s' % (seq, reason))
+ logger.warning("%s not run: %s", seq, reason)
sys.exit(0)
def case_notrun(reason):
if debug:
sys.argv.remove('-d')
logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
+ logger.debug("iotests debugging messages active")
return debug
else:
test_function()
+def activate_logging():
+ """Activate iotests.log() output to stdout for script-style tests."""
+ handler = logging.StreamHandler(stream=sys.stdout)
+ formatter = logging.Formatter('%(message)s')
+ handler.setFormatter(formatter)
+ test_logger.addHandler(handler)
+ test_logger.setLevel(logging.INFO)
+ test_logger.propagate = False
+
# This is called from script-style iotests without a single point of entry
def script_initialize(*args, **kwargs):
"""Initialize script-style tests without running any tests."""
+ activate_logging()
execute_setup_common(*args, **kwargs)
# This is called from script-style iotests with a single point of entry
def script_main(test_function, *args, **kwargs):
"""Run script-style tests outside of the unittest framework"""
+ activate_logging()
execute_test(*args, test_function=test_function, **kwargs)
# This is called from unittest style iotests