CI: new CI ut framework, and can run it in local PC

This commit is contained in:
houchenyao 2018-01-31 18:59:10 +08:00 committed by bot
parent cb3c88fb2f
commit 5b8a9478a3
9 changed files with 778 additions and 162 deletions

View File

@ -446,22 +446,21 @@ assign_test:
- components/idf_test/*/CIConfigs
- components/idf_test/*/TC.sqlite
- $EXAMPLE_CONFIG_OUTPUT_PATH
- tools/unit-test-app/output
expire_in: 1 mos
before_script: *add_gitlab_key_before
script:
# first move test bins together: test_bins/CHIP_SDK/TestApp/bin_files
- mkdir -p $OUTPUT_BIN_PATH
# copy and rename folder name to "UT_config"
- for CONFIG in $(ls $UT_BIN_PATH); do cp -r "$UT_BIN_PATH/$CONFIG" "$OUTPUT_BIN_PATH/UT_$CONFIG"; done
- cp -r SSC/ssc_bin/* $OUTPUT_BIN_PATH
# assign example tests
- python $TEST_FW_PATH/CIAssignExampleTest.py $IDF_PATH/examples $IDF_PATH/.gitlab-ci.yml $EXAMPLE_CONFIG_OUTPUT_PATH
# assign unit test cases
- python $TEST_FW_PATH/CIAssignUnitTest.py $IDF_PATH/components/idf_test/unit_test/TestCaseAll.yml $IDF_PATH/.gitlab-ci.yml $IDF_PATH/components/idf_test/unit_test/CIConfigs
# clone test script to assign tests
- git clone $TEST_SCRIPT_REPOSITORY
- cd auto_test_script
- python $CHECKOUT_REF_SCRIPT auto_test_script
# assign unit test cases
- python CIAssignTestCases.py -t $IDF_PATH/components/idf_test/unit_test -c $IDF_PATH/.gitlab-ci.yml -b $IDF_PATH/test_bins
# assgin integration test cases
- python CIAssignTestCases.py -t $IDF_PATH/components/idf_test/integration_test -c $IDF_PATH/.gitlab-ci.yml -b $IDF_PATH/test_bins
@ -493,6 +492,17 @@ assign_test:
# run test
- python Runner.py $TEST_CASE_PATH -c $CONFIG_FILE
.unit_test_template: &unit_test_template
<<: *example_test_template
stage: unit_test
dependencies:
- assign_test
variables:
TEST_FW_PATH: "$CI_PROJECT_DIR/tools/tiny-test-fw"
TEST_CASE_PATH: "$CI_PROJECT_DIR/tools/unit-test-app"
CONFIG_FILE: "$CI_PROJECT_DIR/components/idf_test/unit_test/CIConfigs/$CI_JOB_NAME.yml"
LOG_PATH: "$CI_PROJECT_DIR/TEST_LOGS"
.test_template: &test_template
stage: test
when: on_success
@ -530,18 +540,6 @@ assign_test:
# run test
- python CIRunner.py -l "$LOG_PATH/$CI_JOB_NAME" -c $CONFIG_FILE -e $LOCAL_ENV_CONFIG_PATH -t $TEST_CASE_FILE_PATH -m $MODULE_UPDATE_FILE
# template for unit test jobs
.unit_test_template: &unit_test_template
<<: *test_template
allow_failure: false
stage: unit_test
variables:
LOCAL_ENV_CONFIG_PATH: "$CI_PROJECT_DIR/ci-test-runner-configs/$CI_RUNNER_DESCRIPTION/ESP32_IDF"
LOG_PATH: "$CI_PROJECT_DIR/$CI_COMMIT_SHA"
TEST_CASE_FILE_PATH: "$CI_PROJECT_DIR/components/idf_test/unit_test"
MODULE_UPDATE_FILE: "$CI_PROJECT_DIR/components/idf_test/ModuleDefinition.yml"
CONFIG_FILE: "$CI_PROJECT_DIR/components/idf_test/unit_test/CIConfigs/$CI_JOB_NAME.yml"
nvs_compatible_test:
<<: *test_template
artifacts:

View File

@ -22,147 +22,16 @@ import sys
import re
import argparse
import yaml
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path:
sys.path.insert(0, test_fw_path)
from Utility import CaseConfig, SearchCases, GitlabCIJob
from Utility.CIAssignTest import AssignTest
class Group(object):
MAX_EXECUTION_TIME = 30
MAX_CASE = 15
SORT_KEYS = ["env_tag"]
def __init__(self, case):
self.execution_time = 0
self.case_list = [case]
self.filters = dict(zip(self.SORT_KEYS, [case.case_info[x] for x in self.SORT_KEYS]))
def accept_new_case(self):
"""
check if allowed to add any case to this group
:return: True or False
"""
max_time = (sum([x.case_info["execution_time"] for x in self.case_list]) < self.MAX_EXECUTION_TIME)
max_case = (len(self.case_list) < self.MAX_CASE)
return max_time and max_case
def add_case(self, case):
"""
add case to current group
:param case: test case
:return: True if add succeed, else False
"""
added = False
if self.accept_new_case():
for key in self.filters:
if case.case_info[key] != self.filters[key]:
break
else:
self.case_list.append(case)
added = True
return added
def output(self):
"""
output data for job configs
:return: {"Filter": case filter, "CaseConfig": list of case configs for cases in this group}
"""
output_data = {
"Filter": self.filters,
"CaseConfig": [{"name": x.case_info["name"]} for x in self.case_list],
}
return output_data
class AssignTest(object):
"""
Auto assign tests to CI jobs.
:param test_case: path of test case file(s)
:param ci_config_file: path of ``.gitlab-ci.yml``
"""
class CIExampleAssignTest(AssignTest):
CI_TEST_JOB_PATTERN = re.compile(r"^example_test_.+")
def __init__(self, test_case, ci_config_file):
self.test_cases = self._search_cases(test_case)
self.jobs = self._parse_gitlab_ci_config(ci_config_file)
def _parse_gitlab_ci_config(self, ci_config_file):
with open(ci_config_file, "r") as f:
ci_config = yaml.load(f)
job_list = list()
for job_name in ci_config:
if self.CI_TEST_JOB_PATTERN.search(job_name) is not None:
job_list.append(GitlabCIJob.Job(ci_config[job_name], job_name))
return job_list
@staticmethod
def _search_cases(test_case, case_filter=None):
"""
:param test_case: path contains test case folder
:param case_filter: filter for test cases
:return: filtered test case list
"""
test_methods = SearchCases.Search.search_test_cases(test_case)
return CaseConfig.filter_test_cases(test_methods, case_filter if case_filter else dict())
def _group_cases(self):
"""
separate all cases into groups according group rules. each group will be executed by one CI job.
:return: test case groups.
"""
groups = []
for case in self.test_cases:
for group in groups:
# add to current group
if group.add_case(case):
break
else:
# create new group
groups.append(Group(case))
return groups
def assign_cases(self):
"""
separate test cases to groups and assign test cases to CI jobs.
:raise AssertError: if failed to assign any case to CI job.
:return: None
"""
failed_to_assign = []
test_groups = self._group_cases()
for group in test_groups:
for job in self.jobs:
if job.match_group(group):
job.assign_group(group)
break
else:
failed_to_assign.append(group)
assert not failed_to_assign
def output_configs(self, output_path):
"""
:param output_path: path to output config files for each CI job
:return: None
"""
if not os.path.exists(output_path):
os.makedirs(output_path)
for job in self.jobs:
job.output_config(output_path)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
@ -174,6 +43,6 @@ if __name__ == '__main__':
help="output path of config files")
args = parser.parse_args()
assign_test = AssignTest(args.test_case, args.ci_config_file)
assign_test = CIExampleAssignTest(args.test_case, args.ci_config_file)
assign_test.assign_cases()
assign_test.output_configs(args.output_path)

View File

@ -0,0 +1,121 @@
"""
Command line tool to assign unit tests to CI test jobs.
"""
import re
import os
import sys
import argparse
import yaml
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path:
sys.path.insert(0, test_fw_path)
from Utility import CIAssignTest
class Group(CIAssignTest.Group):
SORT_KEYS = ["Test App", "SDK", "test environment"]
MAX_CASE = 30
ATTR_CONVERT_TABLE = {
"execution_time": "execution time"
}
@staticmethod
def _get_case_attr(case, attr):
if attr in Group.ATTR_CONVERT_TABLE:
attr = Group.ATTR_CONVERT_TABLE[attr]
return case[attr]
@staticmethod
def _get_ut_config(test_app):
# we format test app "UT_ + config" when parsing test cases
# now we need to extract config
assert test_app[:3] == "UT_"
return test_app[3:]
def _create_extra_data(self):
case_data = []
for case in self.case_list:
if self._get_case_attr(case, "cmd set") == "multiple_devices_case":
case_data.append({
"config": self._get_ut_config(self._get_case_attr(case, "Test App")),
"name": self._get_case_attr(case, "summary"),
"child case num": self._get_case_attr(case, "child case num")
})
else:
case_data.append({
"config": self._get_ut_config(self._get_case_attr(case, "Test App")),
"name": self._get_case_attr(case, "summary"),
"reset": self._get_case_attr(case, "reset") ,
})
return case_data
def output(self):
"""
output data for job configs
:return: {"Filter": case filter, "CaseConfig": list of case configs for cases in this group}
"""
output_data = {
# we don't need filter for test function, as UT uses a few test functions for all cases
"CaseConfig": [
{
"name": self.case_list[0]["cmd set"] if isinstance(self.case_list[0]["cmd set"], str) else self.case_list[0]["cmd set"][0],
"extra_data": self._create_extra_data(),
}
]
}
return output_data
class UnitTestAssignTest(CIAssignTest.AssignTest):
CI_TEST_JOB_PATTERN = re.compile(r"^UT_.+")
def __init__(self, test_case_path, ci_config_file):
CIAssignTest.AssignTest.__init__(self, test_case_path, ci_config_file, case_group=Group)
@staticmethod
def _search_cases(test_case_path, case_filter=None):
"""
For unit test case, we don't search for test functions.
The unit test cases is stored in a yaml file which is created in job build-idf-test.
"""
with open(test_case_path, "r") as f:
raw_data = yaml.load(f)
test_cases = raw_data["test cases"]
if case_filter:
for key in case_filter:
filtered_cases = []
for case in test_cases:
try:
# bot converts string to lower case
if isinstance(case[key], str):
_value = case[key].lower()
else:
_value = case[key]
if _value in case_filter[key]:
filtered_cases.append(case)
except KeyError:
# case don't have this key, regard as filter success
filtered_cases.append(case)
test_cases = filtered_cases
return test_cases
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("test_case",
help="test case folder or file")
parser.add_argument("ci_config_file",
help="gitlab ci config file")
parser.add_argument("output_path",
help="output path of config files")
args = parser.parse_args()
assign_test = UnitTestAssignTest(args.test_case, args.ci_config_file)
assign_test.assign_cases()
assign_test.output_configs(args.output_path)

View File

@ -144,11 +144,28 @@ class Example(IDFApp):
class UT(IDFApp):
def get_binary_path(self, app_path):
if app_path:
# specified path, join it and the idf path
path = os.path.join(self.idf_path, app_path)
else:
path = os.path.join(self.idf_path, "tools", "unit-test-app", "build")
"""
:param app_path: app path or app config
:return: binary path
"""
if not app_path:
app_path = "default"
path = os.path.join(self.idf_path, app_path)
if not os.path.exists(path):
while True:
# try to get by config
if app_path == "default":
# it's default config, we first try to get form build folder of unit-test-app
path = os.path.join(self.idf_path, "tools", "unit-test-app", "build")
if os.path.exists(path):
# found, use bin in build path
break
# ``make ut-build-all-configs`` or ``make ut-build-CONFIG`` will copy binary to output folder
path = os.path.join(self.idf_path, "tools", "unit-test-app", "output", app_path)
if os.path.exists(path):
break
raise OSError("Failed to get unit-test-app binary path")
return path

View File

@ -0,0 +1,206 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Common logic to assign test cases to CI jobs.
Some background knowledge about Gitlab CI and use flow in esp-idf:
* Gitlab CI jobs are static in ``.gitlab-ci.yml``. We can't dynamically create test jobs
* For test job running on DUT, we use ``tags`` to select runners with different test environment
* We have ``assign_test`` stage, will collect cases, and then assign them to correct test jobs
* ``assign_test`` will fail if failed to assign any cases
* with ``assign_test``, we can:
* dynamically filter test case we want to test
* alert user if they forget to add CI jobs and guide how to add test jobs
* the last step of ``assign_test`` is to output config files, then test jobs will run these cases
The Basic logic to assign test cases is as follow:
1. do search all the cases
2. do filter case (if filter is specified by @bot)
3. put cases to different groups according to rule of ``Group``
* try to put them in existed groups
* if failed then create a new group and add this case
4. parse and filter the test jobs from CI config file
5. try to assign all groups to jobs according to tags
6. output config files for jobs
"""
import os
import re
import json
import yaml
from Utility import (CaseConfig, SearchCases, GitlabCIJob)
class Group(object):
MAX_EXECUTION_TIME = 30
MAX_CASE = 15
SORT_KEYS = ["env_tag"]
def __init__(self, case):
self.execution_time = 0
self.case_list = [case]
self.filters = dict(zip(self.SORT_KEYS, [self._get_case_attr(case, x) for x in self.SORT_KEYS]))
@staticmethod
def _get_case_attr(case, attr):
# we might use different type for case (dict or test_func)
# this method will do get attribute form cases
return case.case_info[attr]
def accept_new_case(self):
"""
check if allowed to add any case to this group
:return: True or False
"""
max_time = (sum([self._get_case_attr(x, "execution_time") for x in self.case_list])
< self.MAX_EXECUTION_TIME)
max_case = (len(self.case_list) < self.MAX_CASE)
return max_time and max_case
def add_case(self, case):
"""
add case to current group
:param case: test case
:return: True if add succeed, else False
"""
added = False
if self.accept_new_case():
for key in self.filters:
if self._get_case_attr(case, key) != self.filters[key]:
break
else:
self.case_list.append(case)
added = True
return added
def output(self):
"""
output data for job configs
:return: {"Filter": case filter, "CaseConfig": list of case configs for cases in this group}
"""
output_data = {
"Filter": self.filters,
"CaseConfig": [{"name": self._get_case_attr(x, "name")} for x in self.case_list],
}
return output_data
class AssignTest(object):
"""
Auto assign tests to CI jobs.
:param test_case_path: path of test case file(s)
:param ci_config_file: path of ``.gitlab-ci.yml``
"""
# subclass need to rewrite CI test job pattern, to filter all test jobs
CI_TEST_JOB_PATTERN = re.compile(r"^test_.+")
def __init__(self, test_case_path, ci_config_file, case_group=Group):
self.test_case_path = test_case_path
self.test_cases = []
self.jobs = self._parse_gitlab_ci_config(ci_config_file)
self.case_group = case_group
def _parse_gitlab_ci_config(self, ci_config_file):
with open(ci_config_file, "r") as f:
ci_config = yaml.load(f)
job_list = list()
for job_name in ci_config:
if self.CI_TEST_JOB_PATTERN.search(job_name) is not None:
job_list.append(GitlabCIJob.Job(ci_config[job_name], job_name))
return job_list
@staticmethod
def _search_cases(test_case_path, case_filter=None):
"""
:param test_case_path: path contains test case folder
:param case_filter: filter for test cases
:return: filtered test case list
"""
test_methods = SearchCases.Search.search_test_cases(test_case_path)
return CaseConfig.filter_test_cases(test_methods, case_filter if case_filter else dict())
def _group_cases(self):
"""
separate all cases into groups according group rules. each group will be executed by one CI job.
:return: test case groups.
"""
groups = []
for case in self.test_cases:
for group in groups:
# add to current group
if group.add_case(case):
break
else:
# create new group
groups.append(self.case_group(case))
return groups
@staticmethod
def _apply_bot_filter():
"""
we support customize CI test with bot.
here we process from and return the filter which ``_search_cases`` accepts.
:return: filter for search test cases
"""
bot_filter = os.getenv("BOT_CASE_FILTER")
if bot_filter:
bot_filter = json.loads(bot_filter)
else:
bot_filter = dict()
return bot_filter
def assign_cases(self):
"""
separate test cases to groups and assign test cases to CI jobs.
:raise AssertError: if failed to assign any case to CI job.
:return: None
"""
failed_to_assign = []
case_filter = self._apply_bot_filter()
self.test_cases = self._search_cases(self.test_case_path, case_filter)
test_groups = self._group_cases()
for group in test_groups:
for job in self.jobs:
if job.match_group(group):
job.assign_group(group)
break
else:
failed_to_assign.append(group)
assert not failed_to_assign
def output_configs(self, output_path):
"""
:param output_path: path to output config files for each CI job
:return: None
"""
if not os.path.exists(output_path):
os.makedirs(output_path)
for job in self.jobs:
job.output_config(output_path)

View File

@ -51,6 +51,20 @@ import yaml
import TestCase
def _convert_to_lower_case(item):
"""
bot filter is always lower case string.
this function will convert to all string to lower case.
"""
if isinstance(item, (tuple, list)):
output = [_convert_to_lower_case(v) for v in item]
elif isinstance(item, str):
output = item.lower()
else:
output = item
return output
def _filter_one_case(test_method, case_filter):
""" Apply filter for one case (the filter logic is the same as described in ``filter_test_cases``) """
filter_result = True
@ -58,7 +72,8 @@ def _filter_one_case(test_method, case_filter):
if key in test_method.case_info:
# the filter key is both in case and filter
# we need to check if they match
filter_item, accepted_item = case_filter[key], test_method.case_info[key]
filter_item = _convert_to_lower_case(case_filter[key])
accepted_item = _convert_to_lower_case(test_method.case_info[key])
if isinstance(filter_item, (tuple, list)) \
and isinstance(accepted_item, (tuple, list)):
@ -91,6 +106,7 @@ def filter_test_cases(test_methods, case_filter):
* if one is list/tuple, the other one is string/int, then check if string/int is in list/tuple
* if both are list/tuple, then check if they have common item
2. if only case attribute or filter have the key, filter succeed
3. will do case insensitive compare for string
for example, the following are match succeed scenarios
(the rule is symmetric, result is same if exchange values for user filter and case attribute):

View File

@ -70,4 +70,4 @@ class Job(dict):
file_name = os.path.join(file_path, self["name"] + ".yml")
if "case group" in self:
with open(file_name, "w") as f:
yaml.dump(self["case group"].output(), f)
yaml.dump(self["case group"].output(), f, default_flow_style=False)

View File

@ -20,7 +20,8 @@ TEST_CASE_PATTERN = {
"version": "v1 (2016-12-06)",
"test environment": "UT_T1_1",
"reset": "",
"expected result": "1. set succeed"
"expected result": "1. set succeed",
"cmd set": "test_unit_test_case",
}
CONFIG_FILE_PATTERN = {
@ -78,10 +79,10 @@ class Parser(object):
name_addr = table.get_unsigned_int(section, test_addr, 4)
desc_addr = table.get_unsigned_int(section, test_addr + 4, 4)
file_name_addr = table.get_unsigned_int(section, test_addr + 12, 4)
function_count = table.get_unsigned_int(section, test_addr+20, 4)
name = table.get_string("any", name_addr)
desc = table.get_string("any", desc_addr)
file_name = table.get_string("any", file_name_addr)
tc = self.parse_one_test_case(name, desc, file_name, app_name)
# check if duplicated case names
@ -100,7 +101,13 @@ class Parser(object):
self.test_env_tags[tc["test environment"]].append(tc["ID"])
else:
self.test_env_tags.update({tc["test environment"]: [tc["ID"]]})
# only add cases need to be executed
if function_count > 1:
tc.update({"cmd set": "multiple_devices_case",
"child case num": function_count})
del tc['reset']
# only add cases need to be executed
test_cases.append(tc)
os.remove("section_table.tmp")
@ -178,7 +185,6 @@ class Parser(object):
test_case.update({"Test App": self.APP_NAME_PREFIX + app_name,
"module": self.module_map[prop["module"]]['module'],
"CI ready": "No" if prop["ignore"] == "Yes" else "Yes",
"cmd set": ["IDFUnitTest/UnitTest", [name]],
"ID": tc_id,
"test point 2": prop["module"],
"steps": name,
@ -262,4 +268,3 @@ def main():
if __name__ == '__main__':
main()

View File

@ -0,0 +1,384 @@
"""
Test script for unit test case.
"""
import re
import os
import sys
import time
import threading
# if we want to run test case outside `tiny-test-fw` folder,
# we need to insert tiny-test-fw path into sys path
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
import Utility
from DUT import ExpectTimeout
from IDF.IDFApp import UT
UT_APP_BOOT_UP_DONE = "Press ENTER to see the list of tests."
UT_TIMEOUT = 30
def format_test_case_config(test_case_data):
"""
convert the test case data to unified format.
We need to following info to run unit test cases:
1. unit test app config
2. test case name
3. test case reset info
the formatted case config is a dict, with ut app config as keys. The value is a list of test cases.
Each test case is a dict with "name" and "reset" as keys. For example::
case_config = {
"default": [{"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, {...}],
"psram": [{"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}],
}
If config is not specified for test case, then
:param test_case_data: string, list, or a dictionary list
:return: formatted data
"""
case_config = dict()
def parse_case(one_case_data):
""" parse and format one case """
def process_reset_list(reset_list):
# strip space and remove white space only items
_output = list()
for _r in reset_list:
_data = _r.strip(" ")
if _data:
_output.append(_data)
return _output
_case = dict()
if isinstance(one_case_data, str):
_temp = one_case_data.split(" [reset=")
_case["name"] = _temp[0]
try:
_case["reset"] = process_reset_list(_temp[1][0:-1].split(","))
except IndexError:
_case["reset"] = list()
elif isinstance(one_case_data, dict):
_case = one_case_data.copy()
assert "name" in _case
if "reset" not in _case:
_case["reset"] = list()
else:
if isinstance(_case["reset"], str):
_case["reset"] = process_reset_list(_case["reset"].split(","))
else:
raise TypeError("Not supported type during parsing unit test case")
if "config" not in _case:
_case["config"] = "default"
return _case
if not isinstance(test_case_data, list):
test_case_data = [test_case_data]
for case_data in test_case_data:
parsed_case = parse_case(case_data)
try:
case_config[parsed_case["config"]].append(parsed_case)
except KeyError:
case_config[parsed_case["config"]] = [parsed_case]
return case_config
@TinyFW.test_method(app=UT, dut=IDF.IDFDUT, chip="ESP32", module="unit_test",
execution_time=1, env_tag="UT_T1_1")
def test_unit_test_case(env, extra_data):
"""
extra_data can be three types of value
1. as string:
1. "case_name"
2. "case_name [reset=RESET_REASON]"
2. as dict:
1. with key like {"name": "Intr_alloc test, shared ints"}
2. with key like {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET", "config": "psram"}
3. as list of string or dict:
[case1, case2, case3, {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, ...]
:param extra_data: the case name or case list or case dictionary
:return: None
"""
case_config = format_test_case_config(extra_data)
# compile the patterns for expect only once
reset_pattern = re.compile(r"(ets [\w]{3}\s+[\d]{1,2} [\d]{4} [\d]{2}:[\d]{2}:[\d]{2}[^()]*\([\w].*?\))")
exception_pattern = re.compile(r"(Guru Meditation Error: Core\s+\d panic'ed \([\w].*?\))")
abort_pattern = re.compile(r"(abort\(\) was called at PC 0x[a-eA-E\d]{8} on core \d)")
finish_pattern = re.compile(r"1 Tests (\d) Failures (\d) Ignored")
# we don't want stop on failed case (unless some special scenarios we can't handle)
# this flag is used to log if any of the case failed during executing
# Before exit test function this flag is used to log if the case fails
failed_cases = []
for ut_config in case_config:
dut = env.get_dut("unit-test-app", app_path=ut_config)
dut.start_app()
for one_case in case_config[ut_config]:
dut.reset()
# esptool ``run`` cmd takes quite long time.
# before reset finish, serial port is closed. therefore DUT could already bootup before serial port opened.
# this could cause checking bootup print failed.
# now we input cmd `-`, and check either bootup print or test history,
# to determine if DUT is ready to test.
dut.write("-", flush=False)
dut.expect_any(UT_APP_BOOT_UP_DONE,
"0 Tests 0 Failures 0 Ignored")
# run test case
dut.write("\"{}\"".format(one_case["name"]))
dut.expect("Running " + one_case["name"] + "...")
exception_reset_list = []
# we want to set this flag in callbacks (inner functions)
# use list here so we can use append to set this flag
test_finish = list()
# expect callbacks
def one_case_finish(result):
""" one test finished, let expect loop break and log result """
test_finish.append(True)
if result:
Utility.console_log("Success: " + one_case["name"], color="green")
else:
failed_cases.append(one_case["name"])
Utility.console_log("Failed: " + one_case["name"], color="red")
def handle_exception_reset(data):
"""
just append data to exception list.
exception list will be checked in ``handle_reset_finish``, once reset finished.
"""
exception_reset_list.append(data[0])
def handle_test_finish(data):
""" test finished without reset """
# in this scenario reset should not happen
assert not exception_reset_list
if int(data[1]):
# case ignored
Utility.console_log("Ignored: " + one_case["name"], color="orange")
one_case_finish(not int(data[0]))
def handle_reset_finish(data):
""" reset happened and reboot finished """
assert exception_reset_list # reboot but no exception/reset logged. should never happen
result = False
if len(one_case["reset"]) == len(exception_reset_list):
for i, exception in enumerate(exception_reset_list):
if one_case["reset"][i] not in exception:
break
else:
result = True
if not result:
Utility.console_log("""Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}"""
.format(one_case["reset"], exception_reset_list),
color="orange")
one_case_finish(result)
while not test_finish:
try:
dut.expect_any((reset_pattern, handle_exception_reset), # reset pattern
(exception_pattern, handle_exception_reset), # exception pattern
(abort_pattern, handle_exception_reset), # abort pattern
(finish_pattern, handle_test_finish), # test finish pattern
(UT_APP_BOOT_UP_DONE, handle_reset_finish), # reboot finish pattern
timeout=UT_TIMEOUT)
except ExpectTimeout:
Utility.console_log("Timeout in expect", color="orange")
one_case_finish(False)
break
# raise exception if any case fails
if failed_cases:
Utility.console_log("Failed Cases:", color="red")
for _case_name in failed_cases:
Utility.console_log("\t" + _case_name, color="red")
raise AssertionError("Unit Test Failed")
class Handler(threading.Thread):
WAIT_SIGNAL_PATTERN = re.compile(r'Waiting for signal: \[(.+)\]!')
SEND_SIGNAL_PATTERN = re.compile(r'Send signal: \[(.+)\]!')
FINISH_PATTERN = re.compile(r"1 Tests (\d) Failures (\d) Ignored")
def __init__(self, dut, sent_signal_list, lock, parent_case_name, child_case_index, timeout=30):
self.dut = dut
self.sent_signal_list = sent_signal_list
self.lock = lock
self.parent_case_name = parent_case_name
self.child_case_name = ""
self.child_case_index = child_case_index + 1
self.finish = False
self.result = False
self.fail_name = None
self.timeout = timeout
threading.Thread.__init__(self, name="{} Handler".format(dut))
def run(self):
def get_child_case_name(data):
self.child_case_name = data[0]
time.sleep(1)
self.dut.write(str(self.child_case_index))
def one_device_case_finish(result):
""" one test finished, let expect loop break and log result """
self.finish = True
self.result = result
if not result:
self.fail_name = self.child_case_name
def device_wait_action(data):
start_time = time.time()
expected_signal = data[0]
while 1:
if time.time() > start_time + self.timeout:
Utility.console_log("Timeout in device for function: %s"%self.child_case_name, color="orange")
break
with self.lock:
if expected_signal in self.sent_signal_list:
self.dut.write(" ")
self.sent_signal_list.remove(expected_signal)
break
time.sleep(0.01)
def device_send_action(data):
with self.lock:
self.sent_signal_list.append(data[0].encode('utf-8'))
def handle_device_test_finish(data):
""" test finished without reset """
# in this scenario reset should not happen
if int(data[1]):
# case ignored
Utility.console_log("Ignored: " + self.child_case_name, color="orange")
one_device_case_finish(not int(data[0]))
self.dut.reset()
self.dut.write("-", flush=False)
self.dut.expect_any(UT_APP_BOOT_UP_DONE, "0 Tests 0 Failures 0 Ignored")
time.sleep(1)
self.dut.write("\"{}\"".format(self.parent_case_name))
self.dut.expect("Running " + self.parent_case_name + "...")
while not self.finish:
try:
self.dut.expect_any((re.compile('\(' + str(self.child_case_index) + '\)\s"(\w+)"'), get_child_case_name),
(self.WAIT_SIGNAL_PATTERN, device_wait_action), # wait signal pattern
(self.SEND_SIGNAL_PATTERN, device_send_action), # send signal pattern
(self.FINISH_PATTERN, handle_device_test_finish), # test finish pattern
timeout=UT_TIMEOUT)
except ExpectTimeout:
Utility.console_log("Timeout in expect", color="orange")
one_device_case_finish(False)
break
def get_case_info(one_case):
parent_case = one_case["name"]
child_case_num = one_case["child case num"]
return parent_case, child_case_num
def get_dut(duts, env, name, ut_config):
if name in duts:
dut = duts[name]
else:
dut = env.get_dut(name, app_path=ut_config)
duts[name] = dut
dut.start_app()
return dut
def case_run(duts, ut_config, env, one_case, failed_cases):
lock = threading.RLock()
threads = []
send_signal_list = []
failed_device = []
result = True
parent_case, case_num = get_case_info(one_case)
for i in range(case_num):
dut = get_dut(duts, env, "dut%d" % i, ut_config)
threads.append(Handler(dut, send_signal_list, lock,
parent_case, i))
for thread in threads:
thread.setDaemon(True)
thread.start()
for thread in threads:
thread.join()
result = result and thread.result
if not thread.result:
failed_device.append(thread.fail_name)
if result:
Utility.console_log("Success: " + one_case["name"], color="green")
else:
failed_cases.append(one_case["name"])
Utility.console_log("Failed: " + one_case["name"], color="red")
@TinyFW.test_method(app=UT, dut=IDF.IDFDUT, chip="ESP32", module="master_slave_test_case", execution_time=1,
env_tag="UT_T2_1")
def multiple_devices_case(env, extra_data):
"""
extra_data can be two types of value
1. as dict:
e.g.
{"name": "gpio master/slave test example",
"child case num": 2,
"config": "release",
"env_tag": "UT_T2_1"}
2. as list dict:
e.g.
[{"name": "gpio master/slave test example1",
"child case num": 2,
"config": "release",
"env_tag": "UT_T2_1"},
{"name": "gpio master/slave test example2",
"child case num": 2,
"config": "release",
"env_tag": "UT_T2_1"}]
"""
failed_cases = []
case_config = format_test_case_config(extra_data)
DUTS = {}
for ut_config in case_config:
for one_case in case_config[ut_config]:
case_run(DUTS, ut_config, env, one_case, failed_cases)
if failed_cases:
Utility.console_log("Failed Cases:", color="red")
for _case_name in failed_cases:
Utility.console_log("\t" + _case_name, color="red")
raise AssertionError("Unit Test Failed")
if __name__ == '__main__':
multiple_devices_case(extra_data={"name": "gpio master/slave test example",
"child case num": 2,
"config": "release",
"env_tag": "UT_T2_1"})