chrome-ec/util/run_host_test

133 lines
3.8 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2020 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Wrapper that runs a host test. Handles timeout and stopping the emulator."""
from __future__ import print_function
import argparse
import enum
import io
import os
import pathlib
import select
import subprocess
import sys
import time
class TestResult(enum.Enum):
"""An Enum representing the result of running a test."""
SUCCESS = enum.auto()
FAIL = enum.auto()
TIMEOUT = enum.auto()
UNEXPECTED_TERMINATION = enum.auto()
@property
def exit_code(self):
if self is TestResult.SUCCESS:
return 0
return 1
@property
def reason(self):
return {
TestResult.SUCCESS: 'passed',
TestResult.FAIL: 'failed',
TestResult.TIMEOUT: 'timed out',
TestResult.UNEXPECTED_TERMINATION: 'terminated unexpectedly',
}[self]
def run_test(path, timeout=10):
start_time = time.monotonic()
env = dict(os.environ)
env['ASAN_OPTIONS'] = 'log_path=stderr'
proc = subprocess.Popen(
[path],
bufsize=0,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
env=env)
# Put the output pipe in non-blocking mode. We will then select(2)
# on the pipe to know when we have bytes to process.
os.set_blocking(proc.stdout.fileno(), False)
try:
output_buffer = io.BytesIO()
while True:
select_timeout = timeout - (time.monotonic() - start_time)
if select_timeout <= 0:
return TestResult.TIMEOUT, output_buffer.getvalue()
readable, _, _ = select.select([proc.stdout], [], [], select_timeout)
if not readable:
# Indicates that select(2) timed out.
return TestResult.TIMEOUT, output_buffer.getvalue()
output_buffer.write(proc.stdout.read())
output_log = output_buffer.getvalue()
if b'Pass!' in output_log:
return TestResult.SUCCESS, output_log
if b'Fail!' in output_log:
return TestResult.FAIL, output_log
if proc.poll():
return TestResult.UNEXPECTED_TERMINATION, output_log
finally:
# Check if the process has exited. If not, send it a SIGTERM, wait for it
# to exit, and if it times out, kill the process directly.
if not proc.poll():
try:
proc.terminate()
proc.wait(timeout)
except subprocess.TimeoutExpired:
proc.kill()
def parse_options(argv):
parser = argparse.ArgumentParser()
parser.add_argument('-t', '--timeout', type=float, default=60,
help='Timeout to kill test after.')
parser.add_argument('--coverage', action='store_const', const='coverage',
default='host', dest='test_target',
help='Flag if this is a code coverage test.')
parser.add_argument('test_name', type=str)
return parser.parse_args(argv)
def main(argv):
opts = parse_options(argv)
# Tests will be located in build/host, unless the --coverage flag was
# provided, in which case they will be in build/coverage.
exec_path = pathlib.Path('build', opts.test_target, opts.test_name,
f'{opts.test_name}.exe')
if not exec_path.is_file():
print(f'No test named {opts.test_name} exists!')
return 1
start_time = time.monotonic()
result, output = run_test(exec_path, timeout=opts.timeout)
elapsed_time = time.monotonic() - start_time
print('{} {}! ({:.3f} seconds)'.format(
opts.test_name, result.reason, elapsed_time),
file=sys.stderr)
if result is not TestResult.SUCCESS:
print('====== Emulator output ======', file=sys.stderr)
print(output.decode('utf-8'), file=sys.stderr)
print('=============================', file=sys.stderr)
return result.exit_code
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))