468 lines
15 KiB
Python
Executable File
468 lines
15 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# Copyright 2020 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
"""Runs unit tests on device and displays the results.
|
|
|
|
This script assumes you have a ~/.servodrc config file with a line that
|
|
corresponds to the board being tested.
|
|
|
|
See https://chromium.googlesource.com/chromiumos/third_party/hdctools/+/HEAD/docs/servo.md#servodrc
|
|
"""
|
|
import argparse
|
|
import concurrent
|
|
import io
|
|
import logging
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from concurrent.futures.thread import ThreadPoolExecutor
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from typing import Optional, BinaryIO, List
|
|
|
|
import colorama # type: ignore[import]
|
|
|
|
EC_DIR = Path(os.path.dirname(os.path.realpath(__file__))).parent
|
|
FLASH_SCRIPT = os.path.join(EC_DIR, 'util/flash_jlink.py')
|
|
|
|
ALL_TESTS_PASSED_REGEX = re.compile(r'Pass!\r\n')
|
|
ALL_TESTS_FAILED_REGEX = re.compile(r'Fail! \(\d+ tests\)\r\n')
|
|
|
|
SINGLE_CHECK_PASSED_REGEX = re.compile(r'Pass: .*')
|
|
SINGLE_CHECK_FAILED_REGEX = re.compile(r'.*failed:.*')
|
|
|
|
ASSERTION_FAILURE_REGEX = re.compile(r'ASSERTION FAILURE.*')
|
|
|
|
DATA_ACCESS_VIOLATION_8020000_REGEX = re.compile(
|
|
r'Data access violation, mfar = 8020000\r\n')
|
|
DATA_ACCESS_VIOLATION_8040000_REGEX = re.compile(
|
|
r'Data access violation, mfar = 8040000\r\n')
|
|
DATA_ACCESS_VIOLATION_80C0000_REGEX = re.compile(
|
|
r'Data access violation, mfar = 80c0000\r\n')
|
|
DATA_ACCESS_VIOLATION_80E0000_REGEX = re.compile(
|
|
r'Data access violation, mfar = 80e0000\r\n')
|
|
DATA_ACCESS_VIOLATION_20000000_REGEX = re.compile(
|
|
r'Data access violation, mfar = 20000000\r\n')
|
|
|
|
BLOONCHIPPER = 'bloonchipper'
|
|
DARTMONKEY = 'dartmonkey'
|
|
|
|
|
|
class ImageType(Enum):
|
|
"""EC Image type to use for the test."""
|
|
RO = 1
|
|
RW = 2
|
|
|
|
|
|
class BoardConfig:
|
|
"""Board-specific configuration."""
|
|
|
|
def __init__(self, name, servo_uart_name, servo_power_enable,
|
|
rollback_region0_regex, rollback_region1_regex):
|
|
self.name = name
|
|
self.servo_uart_name = servo_uart_name
|
|
self.servo_power_enable = servo_power_enable
|
|
self.rollback_region0_regex = rollback_region0_regex
|
|
self.rollback_region1_regex = rollback_region1_regex
|
|
|
|
|
|
class TestConfig:
|
|
"""Configuration for a given test."""
|
|
|
|
def __init__(self, name, image_to_use=ImageType.RW, finish_regexes=None,
|
|
toggle_power=False, test_args=None, num_flash_attempts=2,
|
|
timeout_secs=10, enable_hw_write_protect=False):
|
|
if test_args is None:
|
|
test_args = []
|
|
if finish_regexes is None:
|
|
finish_regexes = [ALL_TESTS_PASSED_REGEX, ALL_TESTS_FAILED_REGEX]
|
|
|
|
self.name = name
|
|
self.image_to_use = image_to_use
|
|
self.finish_regexes = finish_regexes
|
|
self.test_args = test_args
|
|
self.toggle_power = toggle_power
|
|
self.num_flash_attempts = num_flash_attempts
|
|
self.timeout_secs = timeout_secs
|
|
self.enable_hw_write_protect = enable_hw_write_protect
|
|
self.logs = []
|
|
self.passed = False
|
|
self.num_fails = 0
|
|
self.num_passes = 0
|
|
|
|
|
|
# All possible tests.
|
|
class AllTests:
|
|
"""All possible tests."""
|
|
|
|
@staticmethod
|
|
def get(board_config: BoardConfig):
|
|
tests = {
|
|
'aes':
|
|
TestConfig(name='aes'),
|
|
'crc':
|
|
TestConfig(name='crc'),
|
|
'flash_physical':
|
|
TestConfig(name='flash_physical', image_to_use=ImageType.RO,
|
|
toggle_power=True),
|
|
'flash_write_protect':
|
|
TestConfig(name='flash_write_protect',
|
|
image_to_use=ImageType.RO,
|
|
toggle_power=True, enable_hw_write_protect=True),
|
|
'fpsensor_spi_ro':
|
|
TestConfig(name='fpsensor', image_to_use=ImageType.RO,
|
|
test_args=['spi']),
|
|
'fpsensor_spi_rw':
|
|
TestConfig(name='fpsensor', test_args=['spi']),
|
|
'fpsensor_uart_ro':
|
|
TestConfig(name='fpsensor', image_to_use=ImageType.RO,
|
|
test_args=['uart']),
|
|
'fpsensor_uart_rw':
|
|
TestConfig(name='fpsensor', test_args=['uart']),
|
|
'mpu_ro':
|
|
TestConfig(name='mpu',
|
|
image_to_use=ImageType.RO,
|
|
finish_regexes=[
|
|
DATA_ACCESS_VIOLATION_20000000_REGEX]),
|
|
'mpu_rw':
|
|
TestConfig(name='mpu',
|
|
finish_regexes=[
|
|
DATA_ACCESS_VIOLATION_20000000_REGEX]),
|
|
'mutex':
|
|
TestConfig(name='mutex'),
|
|
'pingpong':
|
|
TestConfig(name='pingpong'),
|
|
'rollback_region0':
|
|
TestConfig(name='rollback', finish_regexes=[
|
|
board_config.rollback_region0_regex],
|
|
test_args=['region0']),
|
|
'rollback_region1':
|
|
TestConfig(name='rollback', finish_regexes=[
|
|
board_config.rollback_region1_regex],
|
|
test_args=['region1']),
|
|
'rollback_entropy':
|
|
TestConfig(name='rollback_entropy', image_to_use=ImageType.RO),
|
|
'rtc':
|
|
TestConfig(name='rtc'),
|
|
'sha256':
|
|
TestConfig(name='sha256'),
|
|
'sha256_unrolled':
|
|
TestConfig(name='sha256_unrolled'),
|
|
'utils':
|
|
TestConfig(name='utils', timeout_secs=20),
|
|
}
|
|
|
|
if board_config.name == BLOONCHIPPER:
|
|
tests['stm32f_rtc'] = TestConfig(name='stm32f_rtc')
|
|
|
|
return tests
|
|
|
|
|
|
BLOONCHIPPER_CONFIG = BoardConfig(
|
|
name=BLOONCHIPPER,
|
|
servo_uart_name='raw_fpmcu_uart_pty',
|
|
servo_power_enable='spi1_vref',
|
|
rollback_region0_regex=DATA_ACCESS_VIOLATION_8020000_REGEX,
|
|
rollback_region1_regex=DATA_ACCESS_VIOLATION_8040000_REGEX,
|
|
)
|
|
|
|
DARTMONKEY_CONFIG = BoardConfig(
|
|
name=DARTMONKEY,
|
|
servo_uart_name='raw_fpmcu_uart_pty',
|
|
servo_power_enable='spi1_vref',
|
|
rollback_region0_regex=DATA_ACCESS_VIOLATION_80C0000_REGEX,
|
|
rollback_region1_regex=DATA_ACCESS_VIOLATION_80E0000_REGEX,
|
|
)
|
|
|
|
BOARD_CONFIGS = {
|
|
'bloonchipper': BLOONCHIPPER_CONFIG,
|
|
'dartmonkey': DARTMONKEY_CONFIG,
|
|
}
|
|
|
|
|
|
def get_console(board_name: str, board_config: BoardConfig) -> Optional[str]:
|
|
"""Get the name of the console for a given board."""
|
|
cmd = [
|
|
'dut-control',
|
|
'-n', board_name,
|
|
board_config.servo_uart_name,
|
|
]
|
|
logging.debug('Running command: "%s"', ' '.join(cmd))
|
|
|
|
with subprocess.Popen(cmd, stdout=subprocess.PIPE) as proc:
|
|
for line in io.TextIOWrapper(proc.stdout): # type: ignore[arg-type]
|
|
logging.debug(line)
|
|
pty = line.split(':')
|
|
if len(pty) == 2 and pty[0] == board_config.servo_uart_name:
|
|
return pty[1].strip()
|
|
|
|
return None
|
|
|
|
|
|
def power(board_name: str, board_config: BoardConfig, on: bool) -> None:
|
|
"""Turn power to board on/off."""
|
|
if on:
|
|
state = 'pp3300'
|
|
else:
|
|
state = 'off'
|
|
|
|
cmd = [
|
|
'dut-control',
|
|
'-n', board_name,
|
|
board_config.servo_power_enable + ':' + state,
|
|
]
|
|
logging.debug('Running command: "%s"', ' '.join(cmd))
|
|
subprocess.run(cmd).check_returncode()
|
|
|
|
|
|
def hw_write_protect(board_name: str, enable: bool) -> None:
|
|
"""Enable/disable hardware write protect."""
|
|
if enable:
|
|
state = 'on'
|
|
else:
|
|
state = 'off'
|
|
|
|
cmd = [
|
|
'dut-control',
|
|
'-n', board_name,
|
|
'fw_wp_en' + ':' + state,
|
|
]
|
|
logging.debug('Running command: "%s"', ' '.join(cmd))
|
|
subprocess.run(cmd).check_returncode()
|
|
|
|
|
|
def build(test_name: str, board_name: str) -> None:
|
|
"""Build specified test for specified board."""
|
|
cmd = [
|
|
'make',
|
|
'BOARD=' + board_name,
|
|
'test-' + test_name,
|
|
'-j',
|
|
]
|
|
|
|
logging.debug('Running command: "%s"', ' '.join(cmd))
|
|
subprocess.run(cmd).check_returncode()
|
|
|
|
|
|
def flash(test_name: str, board: str) -> bool:
|
|
"""Flash specified test to specified board."""
|
|
logging.info("Flashing test")
|
|
|
|
# TODO(b/151105339): Support ./util/flash_ec as well. It's slower, but only
|
|
# requires servo micro.
|
|
cmd = [
|
|
FLASH_SCRIPT,
|
|
'--board', board,
|
|
'--image', os.path.join(EC_DIR, 'build', board, test_name,
|
|
test_name + '.bin'),
|
|
]
|
|
logging.debug('Running command: "%s"', ' '.join(cmd))
|
|
completed_process = subprocess.run(cmd)
|
|
return completed_process.returncode == 0
|
|
|
|
|
|
def readline(executor: ThreadPoolExecutor, f: BinaryIO, timeout_secs: int) -> \
|
|
Optional[bytes]:
|
|
"""Read a line with timeout."""
|
|
a = executor.submit(f.readline)
|
|
try:
|
|
return a.result(timeout_secs)
|
|
except concurrent.futures.TimeoutError:
|
|
return None
|
|
|
|
|
|
def readlines_until_timeout(executor, f: BinaryIO, timeout_secs: int) -> \
|
|
List[bytes]:
|
|
"""Continuously read lines for timeout_secs."""
|
|
lines: List[bytes] = []
|
|
while True:
|
|
line = readline(executor, f, timeout_secs)
|
|
if not line:
|
|
return lines
|
|
lines.append(line)
|
|
|
|
|
|
def process_console_output_line(line: bytes, test: TestConfig):
|
|
try:
|
|
line_str = line.decode()
|
|
|
|
if SINGLE_CHECK_PASSED_REGEX.match(line_str):
|
|
test.num_passes += 1
|
|
|
|
if SINGLE_CHECK_FAILED_REGEX.match(line_str):
|
|
test.num_fails += 1
|
|
|
|
if ALL_TESTS_FAILED_REGEX.match(line_str):
|
|
test.num_fails += 1
|
|
|
|
if ASSERTION_FAILURE_REGEX.match(line_str):
|
|
test.num_fails += 1
|
|
|
|
return line_str
|
|
except UnicodeDecodeError:
|
|
# Sometimes we get non-unicode from the console (e.g., when the
|
|
# board reboots.) Not much we can do in this case, so we'll just
|
|
# ignore it.
|
|
return None
|
|
|
|
|
|
def run_test(test: TestConfig, console: str, executor: ThreadPoolExecutor) ->\
|
|
bool:
|
|
"""Run specified test."""
|
|
start = time.time()
|
|
with open(console, "wb+", buffering=0) as c:
|
|
# Wait for boot to finish
|
|
time.sleep(1)
|
|
c.write('\n'.encode())
|
|
if test.image_to_use == ImageType.RO:
|
|
c.write('reboot ro\n'.encode())
|
|
time.sleep(1)
|
|
|
|
test_cmd = 'runtest ' + ' '.join(test.test_args) + '\n'
|
|
c.write(test_cmd.encode())
|
|
|
|
while True:
|
|
c.flush()
|
|
line = readline(executor, c, 1)
|
|
if not line:
|
|
now = time.time()
|
|
if now - start > test.timeout_secs:
|
|
logging.debug("Test timed out")
|
|
return False
|
|
continue
|
|
|
|
logging.debug(line)
|
|
test.logs.append(line)
|
|
# Look for test_print_result() output (success or failure)
|
|
line_str = process_console_output_line(line, test)
|
|
if line_str is None:
|
|
# Sometimes we get non-unicode from the console (e.g., when the
|
|
# board reboots.) Not much we can do in this case, so we'll just
|
|
# ignore it.
|
|
continue
|
|
|
|
for r in test.finish_regexes:
|
|
if r.match(line_str):
|
|
# flush read the remaining
|
|
lines = readlines_until_timeout(executor, c, 1)
|
|
logging.debug(lines)
|
|
test.logs.append(lines)
|
|
|
|
for line in lines:
|
|
process_console_output_line(line, test)
|
|
|
|
return test.num_fails == 0
|
|
|
|
|
|
def get_test_list(config: BoardConfig, test_args) -> List[TestConfig]:
|
|
"""Get a list of tests to run."""
|
|
if test_args == 'all':
|
|
return list(AllTests.get(config).values())
|
|
|
|
test_list = []
|
|
for t in test_args:
|
|
logging.debug('test: %s', t)
|
|
test_config = AllTests.get(config).get(t)
|
|
if test_config is None:
|
|
logging.error('Unable to find test config for "%s"', t)
|
|
sys.exit(1)
|
|
test_list.append(test_config)
|
|
|
|
return test_list
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
|
|
default_board = 'bloonchipper'
|
|
parser.add_argument(
|
|
'--board', '-b',
|
|
help='Board (default: ' + default_board + ')',
|
|
default=default_board)
|
|
|
|
default_tests = 'all'
|
|
parser.add_argument(
|
|
'--tests', '-t',
|
|
nargs='+',
|
|
help='Tests (default: ' + default_tests + ')',
|
|
default=default_tests)
|
|
|
|
log_level_choices = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
|
|
parser.add_argument(
|
|
'--log_level', '-l',
|
|
choices=log_level_choices,
|
|
default='DEBUG'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
logging.basicConfig(level=args.log_level)
|
|
|
|
if args.board not in BOARD_CONFIGS:
|
|
logging.error('Unable to find a config for board: "%s"', args.board)
|
|
sys.exit(1)
|
|
|
|
board_config = BOARD_CONFIGS[args.board]
|
|
|
|
e = ThreadPoolExecutor(max_workers=1)
|
|
|
|
test_list = get_test_list(board_config, args.tests)
|
|
logging.debug('Running tests: %s', [t.name for t in test_list])
|
|
|
|
for test in test_list:
|
|
# build test binary
|
|
build(test.name, args.board)
|
|
|
|
# flash test binary
|
|
# TODO(b/158327221): First attempt to flash fails after
|
|
# flash_write_protect test is run; works after second attempt.
|
|
flash_succeeded = False
|
|
for i in range(0, test.num_flash_attempts):
|
|
logging.debug('Flash attempt %d', i + 1)
|
|
if flash(test.name, args.board):
|
|
flash_succeeded = True
|
|
break
|
|
time.sleep(1)
|
|
|
|
if not flash_succeeded:
|
|
logging.debug('Flashing failed after max attempts: %d',
|
|
test.num_flash_attempts)
|
|
test.passed = False
|
|
continue
|
|
|
|
if test.toggle_power:
|
|
power(args.board, board_config, on=False)
|
|
time.sleep(1)
|
|
power(args.board, board_config, on=True)
|
|
|
|
hw_write_protect(args.board, test.enable_hw_write_protect)
|
|
|
|
# run the test
|
|
logging.info('Running test: "%s"', test.name)
|
|
console = get_console(args.board, board_config)
|
|
test.passed = run_test(test, console, executor=e)
|
|
|
|
colorama.init()
|
|
exit_code = 0
|
|
for test in test_list:
|
|
# print results
|
|
print('Test "' + test.name + '": ', end='')
|
|
if test.passed:
|
|
print(colorama.Fore.GREEN + 'PASSED')
|
|
else:
|
|
print(colorama.Fore.RED + 'FAILED')
|
|
exit_code = 1
|
|
|
|
print(colorama.Style.RESET_ALL)
|
|
|
|
e.shutdown(wait=False)
|
|
sys.exit(exit_code)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|