- git submodule update --init --recursive
assign_test:
- <<: *build_template
+ tags:
+ - assign_test
+ image: $CI_DOCKER_REGISTRY/ubuntu-test-env$BOT_DOCKER_IMAGE_TAG
stage: assign_test
# gitlab ci do not support match job with RegEx or wildcard now in dependencies.
# we have a lot build example jobs. now we don't use dependencies, just download all artificats of build stage.
+ dependencies:
+ - build_ssc_00
+ - build_ssc_01
+ - build_ssc_02
+ - build_esp_idf_tests
variables:
- UT_BIN_PATH: "tools/unit-test-app/output"
- OUTPUT_BIN_PATH: "test_bins/ESP32_IDF"
TEST_FW_PATH: "$CI_PROJECT_DIR/tools/tiny-test-fw"
EXAMPLE_CONFIG_OUTPUT_PATH: "$CI_PROJECT_DIR/examples/test_configs"
artifacts:
paths:
- - $OUTPUT_BIN_PATH
- components/idf_test/*/CIConfigs
- components/idf_test/*/TC.sqlite
- $EXAMPLE_CONFIG_OUTPUT_PATH
expire_in: 1 mos
before_script: *add_gitlab_key_before
script:
- # first move test bins together: test_bins/CHIP_SDK/TestApp/bin_files
- - mkdir -p $OUTPUT_BIN_PATH
- # copy and rename folder name to "UT_config"
- - for CONFIG in $(ls $UT_BIN_PATH); do cp -r "$UT_BIN_PATH/$CONFIG" "$OUTPUT_BIN_PATH/UT_$CONFIG"; done
- - cp -r SSC/ssc_bin/* $OUTPUT_BIN_PATH
# assign example tests
- python $TEST_FW_PATH/CIAssignExampleTest.py $IDF_PATH/examples $IDF_PATH/.gitlab-ci.yml $EXAMPLE_CONFIG_OUTPUT_PATH
+ # assign unit test cases
+ - python $TEST_FW_PATH/CIAssignUnitTest.py $IDF_PATH/components/idf_test/unit_test/TestCaseAll.yml $IDF_PATH/.gitlab-ci.yml $IDF_PATH/components/idf_test/unit_test/CIConfigs
# clone test script to assign tests
- git clone $TEST_SCRIPT_REPOSITORY
- cd auto_test_script
- python $CHECKOUT_REF_SCRIPT auto_test_script
- # assign unit test cases
- - python CIAssignTestCases.py -t $IDF_PATH/components/idf_test/unit_test -c $IDF_PATH/.gitlab-ci.yml -b $IDF_PATH/test_bins
# assgin integration test cases
- python CIAssignTestCases.py -t $IDF_PATH/components/idf_test/integration_test -c $IDF_PATH/.gitlab-ci.yml -b $IDF_PATH/SSC/ssc_bin
# template for unit test jobs
.unit_test_template: &unit_test_template
- <<: *test_template
- allow_failure: false
+ <<: *example_test_template
stage: unit_test
+ dependencies:
+ - assign_test
+ - build_esp_idf_tests
+ only:
+ refs:
+ - master
+ - /^release\/v/
+ - /^v\d+\.\d+(\.\d+)?($|-)/
+ - triggers
variables:
- LOCAL_ENV_CONFIG_PATH: "$CI_PROJECT_DIR/ci-test-runner-configs/$CI_RUNNER_DESCRIPTION/ESP32_IDF"
- LOG_PATH: "$CI_PROJECT_DIR/$CI_COMMIT_SHA"
- TEST_CASE_FILE_PATH: "$CI_PROJECT_DIR/components/idf_test/unit_test"
- MODULE_UPDATE_FILE: "$CI_PROJECT_DIR/components/idf_test/ModuleDefinition.yml"
+ TEST_FW_PATH: "$CI_PROJECT_DIR/tools/tiny-test-fw"
+ TEST_CASE_PATH: "$CI_PROJECT_DIR/tools/unit-test-app"
CONFIG_FILE: "$CI_PROJECT_DIR/components/idf_test/unit_test/CIConfigs/$CI_JOB_NAME.yml"
+ LOG_PATH: "$CI_PROJECT_DIR/TEST_LOGS"
+ ENV_FILE: "$CI_PROJECT_DIR/ci-test-runner-configs/$CI_RUNNER_DESCRIPTION/EnvConfig.yml"
nvs_compatible_test:
<<: *test_template
tags:
- ESP32_IDF
- UT_T1_1
- - UT_default
UT_001_02:
<<: *unit_test_template
tags:
- ESP32_IDF
- UT_T1_1
- - UT_default
UT_001_03:
<<: *unit_test_template
tags:
- ESP32_IDF
- UT_T1_1
- - UT_default
UT_001_04:
<<: *unit_test_template
tags:
- ESP32_IDF
- UT_T1_1
- - UT_default
UT_001_05:
<<: *unit_test_template
tags:
- ESP32_IDF
- - UT_T1_SDMODE
- - UT_default
+ - UT_T1_1
UT_001_06:
<<: *unit_test_template
tags:
- ESP32_IDF
- - UT_T1_SPIMODE
- - UT_default
+ - UT_T1_1
UT_001_07:
<<: *unit_test_template
tags:
- ESP32_IDF
- UT_T1_1
- - UT_default
UT_001_08:
<<: *unit_test_template
tags:
- ESP32_IDF
- UT_T1_1
- - UT_default
UT_001_09:
<<: *unit_test_template
tags:
- ESP32_IDF
- UT_T1_1
- - UT_default
-UT_002_01:
+UT_001_10:
<<: *unit_test_template
tags:
- ESP32_IDF
- UT_T1_1
- - UT_release
-UT_002_02:
+UT_001_11:
<<: *unit_test_template
tags:
- ESP32_IDF
- UT_T1_1
- - UT_release
-UT_002_03:
+UT_001_12:
<<: *unit_test_template
tags:
- ESP32_IDF
- UT_T1_1
- - UT_release
-UT_002_04:
+UT_001_13:
<<: *unit_test_template
tags:
- ESP32_IDF
- UT_T1_1
- - UT_release
-UT_002_05:
- <<: *unit_test_template
- tags:
- - ESP32_IDF
- - UT_T1_SDMODE
- - UT_release
-
-UT_002_06:
- <<: *unit_test_template
- tags:
- - ESP32_IDF
- - UT_T1_SPIMODE
- - UT_release
-
-UT_002_07:
+UT_001_14:
<<: *unit_test_template
tags:
- ESP32_IDF
- UT_T1_1
- - UT_release
-UT_002_08:
+UT_001_15:
<<: *unit_test_template
tags:
- ESP32_IDF
- UT_T1_1
- - UT_release
-UT_002_09:
+UT_001_16:
<<: *unit_test_template
tags:
- ESP32_IDF
- UT_T1_1
- - UT_release
-UT_003_01:
+UT_001_17:
<<: *unit_test_template
tags:
- ESP32_IDF
- UT_T1_1
- - UT_single_core
-UT_003_02:
+UT_001_18:
<<: *unit_test_template
tags:
- ESP32_IDF
- UT_T1_1
- - UT_single_core
-
-UT_003_03:
- <<: *unit_test_template
- tags:
- - ESP32_IDF
- - UT_T1_1
- - UT_single_core
-
-UT_003_04:
- <<: *unit_test_template
- tags:
- - ESP32_IDF
- - UT_T1_1
- - UT_single_core
-
-UT_003_05:
- <<: *unit_test_template
- tags:
- - ESP32_IDF
- - UT_T1_SDMODE
- - UT_single_core
-
-UT_003_06:
- <<: *unit_test_template
- tags:
- - ESP32_IDF
- - UT_T1_SPIMODE
- - UT_single_core
-UT_003_07:
+UT_001_19:
<<: *unit_test_template
tags:
- ESP32_IDF
- UT_T1_1
- - UT_single_core
-UT_003_08:
+UT_001_20:
<<: *unit_test_template
tags:
- ESP32_IDF
- UT_T1_1
- - UT_single_core
-UT_003_09:
+UT_001_21:
<<: *unit_test_template
tags:
- ESP32_IDF
- UT_T1_1
- - UT_single_core
-UT_004_01:
+UT_002_01:
<<: *unit_test_template
tags:
- ESP32_IDF
- UT_T1_1
- - UT_psram
+ - psram
-UT_004_02:
+UT_002_02:
<<: *unit_test_template
tags:
- ESP32_IDF
- UT_T1_1
- - UT_psram
+ - psram
-UT_004_03:
+UT_002_03:
<<: *unit_test_template
tags:
- ESP32_IDF
- UT_T1_1
- - UT_psram
+ - psram
-UT_004_04:
+UT_002_04:
<<: *unit_test_template
tags:
- ESP32_IDF
- UT_T1_1
- - UT_psram
+ - psram
-UT_004_05:
- <<: *unit_test_template
- tags:
- - ESP32_IDF
- - UT_T1_SDMODE
- - UT_psram
-
-UT_004_06:
- <<: *unit_test_template
- tags:
- - ESP32_IDF
- - UT_T1_SPIMODE
- - UT_psram
-
-UT_004_07:
+UT_002_05:
<<: *unit_test_template
tags:
- ESP32_IDF
- UT_T1_1
- - UT_psram
+ - psram
-UT_004_08:
+UT_002_06:
<<: *unit_test_template
tags:
- ESP32_IDF
- UT_T1_1
- - UT_psram
+ - psram
-UT_004_09:
+UT_002_07:
<<: *unit_test_template
tags:
- ESP32_IDF
- UT_T1_1
- - UT_psram
+ - psram
IT_001_01:
<<: *test_template
--- /dev/null
+menu "Unity test framework"
+
+config UNITY_FREERTOS_PRIORITY
+ int "Priority of Unity test task"
+ default 5
+
+config UNITY_FREERTOS_CPU
+ int "CPU to run Unity test task on"
+ default 0
+
+config UNITY_FREERTOS_STACK_SIZE
+ int "Stack size of Unity test task, in bytes"
+ default 8192
+
+endmenu
*/
void ref_clock_deinit();
+
/**
* @brief Get reference clock timestamp
* @return number of microseconds since the reference clock was initialized
*/
uint64_t ref_clock_get();
+
+/**
+ * @brief wait for signals.
+ *
+ * for multiple devices test cases, DUT might need to wait for other DUTs before continue testing.
+ * As all DUTs are independent, need user (or test script) interaction to make test synchronized.
+ *
+ * Here we provide signal functions for this.
+ * For example, we're testing GPIO, DUT1 has one pin connect to with DUT2.
+ * DUT2 will output high level and then DUT1 will read input.
+ * DUT1 should call `unity_wait_for_signal("output high level");` before it reads input.
+ * DUT2 should call `unity_send_signal("output high level");` after it finished setting output high level.
+ * According to the console logs:
+ *
+ * DUT1 console:
+ *
+ * ```
+ * Waiting for signal: [output high level]!
+ * Please press "Enter" key to once any board send this signal.
+ * ```
+ *
+ * DUT2 console:
+ *
+ * ```
+ * Send signal: [output high level]!
+ * ```
+ *
+ * Then we press Enter key on DUT1's console, DUT1 starts to read input and then test success.
+ *
+ * @param signal_name signal name which DUT expected to wait before proceed testing
+ */
+void unity_wait_for_signal(const char* signal_name);
+
+/**
+ * @brief DUT send signal.
+ *
+ * @param signal_name signal name which DUT send once it finished preparing.
+ */
+void unity_send_signal(const char* signal_name);
// Adapt Unity to our environment, disable FP support
#include <esp_err.h>
+#include <sdkconfig.h>
/* Some definitions applicable to Unity running in FreeRTOS */
-#define UNITY_FREERTOS_PRIORITY 5
-#define UNITY_FREERTOS_CPU 0
+#define UNITY_FREERTOS_PRIORITY CONFIG_UNITY_FREERTOS_PRIORITY
+#define UNITY_FREERTOS_CPU CONFIG_UNITY_FREERTOS_CPU
+#define UNITY_FREERTOS_STACK_SIZE CONFIG_UNITY_FREERTOS_STACK_SIZE
#define UNITY_EXCLUDE_FLOAT
#define UNITY_EXCLUDE_DOUBLE
#define UNITY_OUTPUT_FLUSH unity_flush
// Define helpers to register test cases from multiple files
-
#define UNITY_EXPAND2(a, b) a ## b
#define UNITY_EXPAND(a, b) UNITY_EXPAND2(a, b)
#define UNITY_TEST_UID(what) UNITY_EXPAND(what, __LINE__)
#define UNITY_TEST_REG_HELPER reg_helper ## UNITY_TEST_UID
#define UNITY_TEST_DESC_UID desc ## UNITY_TEST_UID
+
+
+// get count of __VA_ARGS__
+#define PP_NARG(...) \
+ PP_NARG_(__VA_ARGS__,PP_RSEQ_N())
+#define PP_NARG_(...) \
+ PP_ARG_N(__VA_ARGS__)
+#define PP_ARG_N( \
+ _1, _2, _3, _4, _5, _6, _7, _8, _9, N, ...) N
+#define PP_RSEQ_N() 9,8,7,6,5,4,3,2,1,0
+
+// support max 5 test func now
+#define FN_NAME_SET_1(a) {#a}
+#define FN_NAME_SET_2(a, b) {#a, #b}
+#define FN_NAME_SET_3(a, b, c) {#a, #b, #c}
+#define FN_NAME_SET_4(a, b, c, d) {#a, #b, #c, #d}
+#define FN_NAME_SET_5(a, b, c, d, e) {#a, #b, #c, #d, #e}
+
+#define FN_NAME_SET2(n) FN_NAME_SET_##n
+#define FN_NAME_SET(n, ...) FN_NAME_SET2(n)(__VA_ARGS__)
+
+#define UNITY_TEST_FN_SET(...) \
+ static test_func UNITY_TEST_UID(test_functions)[] = {__VA_ARGS__}; \
+ static const char* UNITY_TEST_UID(test_fn_name)[] = FN_NAME_SET(PP_NARG(__VA_ARGS__), __VA_ARGS__)
+
+
+typedef void (* test_func)(void);
+
struct test_desc_t
{
- const char* name;
- const char* desc;
- void (*fn)(void);
- const char* file;
- int line;
- struct test_desc_t* next;
+ const char* name;
+ const char* desc;
+ test_func* fn;
+ const char* file;
+ int line;
+ uint8_t test_fn_count;
+ const char ** test_fn_name;
+ struct test_desc_t* next;
};
void unity_testcase_register(struct test_desc_t* desc);
void unity_run_all_tests();
/* Test case macro, a-la CATCH framework.
- First argument is a free-form description,
+ First argument is a free-form description,
second argument is (by convention) a list of identifiers, each one in square brackets.
Identifiers are used to group related tests, or tests with specific properties.
Use like:
// test goes here
}
*/
+
#define TEST_CASE(name_, desc_) \
- static void UNITY_TEST_UID(test_func_) (void); \
- static void __attribute__((constructor)) UNITY_TEST_UID(test_reg_helper_) () \
- { \
- static struct test_desc_t UNITY_TEST_UID(test_desc_) = { \
- .name = name_, \
- .desc = desc_, \
- .fn = &UNITY_TEST_UID(test_func_), \
- .file = __FILE__, \
- .line = __LINE__, \
- .next = NULL \
- }; \
- unity_testcase_register( & UNITY_TEST_UID(test_desc_) ); \
- }\
- static void UNITY_TEST_UID(test_func_) (void)
+ static void UNITY_TEST_UID(test_func_) (void); \
+ static void __attribute__((constructor)) UNITY_TEST_UID(test_reg_helper_) () \
+ { \
+ static test_func test_fn_[] = {&UNITY_TEST_UID(test_func_)}; \
+ static struct test_desc_t UNITY_TEST_UID(test_desc_) = { \
+ .name = name_, \
+ .desc = desc_, \
+ .fn = test_fn_, \
+ .file = __FILE__, \
+ .line = __LINE__, \
+ .test_fn_count = 1, \
+ .test_fn_name = NULL, \
+ .next = NULL \
+ }; \
+ unity_testcase_register( & UNITY_TEST_UID(test_desc_) ); \
+ }\
+ static void UNITY_TEST_UID(test_func_) (void)
+
+
+/*
+ * Multiple stages test cases will handle the case that test steps are separated by DUT reset.
+ * e.g: we want to verify some function after SW reset, WDT reset or deep sleep reset.
+ *
+ * First argument is a free-form description,
+ * second argument is (by convention) a list of identifiers, each one in square brackets.
+ * subsequent arguments are names test functions separated by reset.
+ * e.g:
+ * TEST_CASE_MULTIPLE_STAGES("run light sleep after deep sleep","[sleep]", goto_deepsleep, light_sleep_after_deep_sleep_wakeup);
+ * */
+
+#define TEST_CASE_MULTIPLE_STAGES(name_, desc_, ...) \
+ UNITY_TEST_FN_SET(__VA_ARGS__); \
+ static void __attribute__((constructor)) UNITY_TEST_UID(test_reg_helper_) () \
+ { \
+ static struct test_desc_t UNITY_TEST_UID(test_desc_) = { \
+ .name = name_, \
+ .desc = desc_"[multi_stage]", \
+ .fn = UNITY_TEST_UID(test_functions), \
+ .file = __FILE__, \
+ .line = __LINE__, \
+ .test_fn_count = PP_NARG(__VA_ARGS__), \
+ .test_fn_name = UNITY_TEST_UID(test_fn_name), \
+ .next = NULL \
+ }; \
+ unity_testcase_register( & UNITY_TEST_UID(test_desc_) ); \
+ }
+
+/*
+ * First argument is a free-form description,
+ * second argument is (by convention) a list of identifiers, each one in square brackets.
+ * subsequent arguments are names of test functions for different DUTs
+ * e.g:
+ * TEST_CASE_MULTIPLE_DEVICES("master and slave spi","[spi][test_env=UT_T2_1]", master_test, slave_test);
+ * */
+
+#define TEST_CASE_MULTIPLE_DEVICES(name_, desc_, ...) \
+ UNITY_TEST_FN_SET(__VA_ARGS__); \
+ static void __attribute__((constructor)) UNITY_TEST_UID(test_reg_helper_) () \
+ { \
+ static struct test_desc_t UNITY_TEST_UID(test_desc_) = { \
+ .name = name_, \
+ .desc = desc_"[multi_device]", \
+ .fn = UNITY_TEST_UID(test_functions), \
+ .file = __FILE__, \
+ .line = __LINE__, \
+ .test_fn_count = PP_NARG(__VA_ARGS__), \
+ .test_fn_name = UNITY_TEST_UID(test_fn_name), \
+ .next = NULL \
+ }; \
+ unity_testcase_register( & UNITY_TEST_UID(test_desc_) ); \
+ }
+
/**
* Note: initialization of test_desc_t fields above has to be done exactly
* in the same order as the fields are declared in the structure.
// See the License for the specific language governing permissions and
// limitations under the License.
+#include <string.h>
#include "unity.h"
#include "test_utils.h"
+#include "rom/ets_sys.h"
+#include "rom/uart.h"
const esp_partition_t *get_test_data_partition()
{
TEST_ASSERT_NOT_NULL(result); /* means partition table set wrong */
return result;
}
+
+// wait user to send "Enter" key
+static void wait_user_control()
+{
+ char sign[5] = {0};
+ while(strlen(sign) == 0)
+ {
+ /* Flush anything already in the RX buffer */
+ while(uart_rx_one_char((uint8_t *) sign) == OK) {
+ }
+ /* Read line */
+ UartRxString((uint8_t*) sign, sizeof(sign) - 1);
+ }
+}
+
+// signal functions, used for sync between unity DUTs for multiple devices cases
+void unity_wait_for_signal(const char* signal_name)
+{
+ printf("Waiting for signal: [%s]!\n"
+ "Please press \"Enter\" key to once any board send this signal.\n", signal_name);
+ wait_user_control();
+}
+
+void unity_send_signal(const char* signal_name)
+{
+ printf("Send signal: [%s]!\n", signal_name);
+}
+
#include "esp_heap_trace.h"
#endif
-#define unity_printf ets_printf
-
// Pointers to the head and tail of linked list of test description structs:
static struct test_desc_t* s_unity_tests_first = NULL;
static struct test_desc_t* s_unity_tests_last = NULL;
}
}
+/* print the multiple function case name and its sub-menu
+ * e.g:
+ * (1) spi master/slave case
+ * (1)master case
+ * (2)slave case
+ * */
+static void print_multiple_function_test_menu(const struct test_desc_t* test_ms)
+ {
+ printf("%s\n", test_ms->name);
+ for (int i = 0; i < test_ms->test_fn_count; i++)
+ {
+ printf("\t(%d)\t\"%s\"\n", i+1, test_ms->test_fn_name[i]);
+ }
+ }
+
+void multiple_function_option(const struct test_desc_t* test_ms)
+{
+ int selection;
+ char cmdline[256] = {0};
+
+ print_multiple_function_test_menu(test_ms);
+ while(strlen(cmdline) == 0)
+ {
+ /* Flush anything already in the RX buffer */
+ while(uart_rx_one_char((uint8_t *) cmdline) == OK) {
+
+ }
+ UartRxString((uint8_t*) cmdline, sizeof(cmdline) - 1);
+ if(strlen(cmdline) == 0) {
+ /* if input was newline, print a new menu */
+ print_multiple_function_test_menu(test_ms);
+ }
+ }
+ selection = atoi((const char *) cmdline) - 1;
+ if(selection >= 0 && selection < test_ms->test_fn_count) {
+ UnityDefaultTestRun(test_ms->fn[selection], test_ms->name, test_ms->line);
+ } else {
+ printf("Invalid selection, your should input number 1-%d!", test_ms->test_fn_count);
+ }
+}
+
static void unity_run_single_test(const struct test_desc_t* test)
{
printf("Running %s...\n", test->name);
+ // Unit test runner expects to see test name before the test starts
+ fflush(stdout);
+ uart_tx_wait_idle(CONFIG_CONSOLE_UART_NUM);
+
Unity.TestFile = test->file;
Unity.CurrentDetail1 = test->desc;
- UnityDefaultTestRun(test->fn, test->name, test->line);
+ if(test->test_fn_count == 1) {
+ UnityDefaultTestRun(test->fn[0], test->name, test->line);
+ } else {
+ multiple_function_option(test);
+ }
}
static void unity_run_single_test_by_index(int index)
const struct test_desc_t* test;
for (test = s_unity_tests_first; test != NULL && index != 0; test = test->next, --index)
{
+
}
if (test != NULL)
{
{
unity_run_single_test(test);
}
- }
+ }
}
void unity_run_all_tests()
static int print_test_menu(void)
{
int test_counter = 0;
- unity_printf("\n\nHere's the test menu, pick your combo:\n");
+ printf("\n\nHere's the test menu, pick your combo:\n");
for (const struct test_desc_t* test = s_unity_tests_first;
test != NULL;
test = test->next, ++test_counter)
{
- unity_printf("(%d)\t\"%s\" %s\n", test_counter + 1, test->name, test->desc);
- }
- return test_counter;
+ printf("(%d)\t\"%s\" %s\n", test_counter + 1, test->name, test->desc);
+ if(test->test_fn_count > 1)
+ {
+ for (int i = 0; i < test->test_fn_count; i++)
+ {
+ printf("\t(%d)\t\"%s\"\n", i+1, test->test_fn_name[i]);
+ }
+ }
+ }
+ return test_counter;
}
static int get_test_count(void)
void unity_run_menu()
{
- unity_printf("\n\nPress ENTER to see the list of tests.\n");
+ printf("\n\nPress ENTER to see the list of tests.\n");
int test_count = get_test_count();
while (true)
{
print_test_menu();
}
}
+ /*use '-' to show test history. Need to do it before UNITY_BEGIN cleanup history */
+ if (cmdline[0] == '-')
+ {
+ UNITY_END();
+ continue;
+ }
UNITY_BEGIN();
#include "freertos/task.h"
#include "unity.h"
#include "unity_config.h"
+#include "tcpip_adapter.h"
void unityTask(void *pvParameters)
{
void app_main()
{
+ // TCP/IP adapter is initialized here because it leaks memory so the
+ // initialization in test cases would make the test fail because of leak.
+ tcpip_adapter_init();
+
// Note: if unpinning this task, change the way run times are calculated in
// unity_platform
- xTaskCreatePinnedToCore(unityTask, "unityTask", 8192, NULL,
+ xTaskCreatePinnedToCore(unityTask, "unityTask", UNITY_FREERTOS_STACK_SIZE, NULL,
UNITY_FREERTOS_PRIORITY, NULL, UNITY_FREERTOS_CPU);
}
--- /dev/null
+"psram": "CONFIG_SPIRAM_SUPPORT=y"
omitted: "UT_T1_1"
reset:
default: "POWERON_RESET"
- omitted: " "
\ No newline at end of file
+ omitted: " "
+multi_device:
+ default: "Yes"
+ omitted: "No"
+multi_stage:
+ default: "Yes"
+ omitted: "No"
+timeout:
+ default: 30
+ omitted: 30
from copy import deepcopy
import CreateSectionTable
-
TEST_CASE_PATTERN = {
"initial condition": "UTINIT1",
"SDK": "ESP32_IDF",
"version": "v1 (2016-12-06)",
"test environment": "UT_T1_1",
"reset": "",
- "expected result": "1. set succeed"
-}
-
-CONFIG_FILE_PATTERN = {
- "Config": {"execute count": 1, "execute order": "in order"},
- "DUT": [],
- "Filter": [{"Add": {"ID": []}}]
+ "expected result": "1. set succeed",
+ "cmd set": "test_unit_test_case",
+ "Test App": "UT",
}
# file path (relative to idf path)
TAG_DEF_FILE = os.path.join("tools", "unit-test-app", "tools", "TagDefinition.yml")
MODULE_DEF_FILE = os.path.join("tools", "unit-test-app", "tools", "ModuleDefinition.yml")
+ CONFIG_DEPENDENCY_FILE = os.path.join("tools", "unit-test-app", "tools", "ConfigDependency.yml")
MODULE_ARTIFACT_FILE = os.path.join("components", "idf_test", "ModuleDefinition.yml")
TEST_CASE_FILE = os.path.join("components", "idf_test", "unit_test", "TestCaseAll.yml")
- UT_BIN_FOLDER = os.path.join("tools", "unit-test-app", "builds")
+ UT_BIN_FOLDER = os.path.join("tools", "unit-test-app", "output")
ELF_FILE = "unit-test-app.elf"
- APP_NAME_PREFIX = "UT_"
+ SDKCONFIG_FILE = "sdkconfig"
def __init__(self, idf_path=os.getenv("IDF_PATH")):
self.test_env_tags = {}
self.idf_path = idf_path
self.tag_def = yaml.load(open(os.path.join(idf_path, self.TAG_DEF_FILE), "r"))
self.module_map = yaml.load(open(os.path.join(idf_path, self.MODULE_DEF_FILE), "r"))
+ self.config_dependency = yaml.load(open(os.path.join(idf_path, self.CONFIG_DEPENDENCY_FILE), "r"))
# used to check if duplicated test case names
self.test_case_names = set()
self.parsing_errors = []
- def parse_test_cases_from_elf(self, elf_file, app_name):
+ def parse_test_cases_for_one_config(self, config_output_folder, config_name):
"""
parse test cases from elf and save test cases need to be executed to unit test folder
- :param elf_file: elf file path
- :param app_name: built unit test app name
+ :param config_output_folder: build folder of this config
+ :param config_name: built unit test config name
"""
+ elf_file = os.path.join(config_output_folder, self.ELF_FILE)
subprocess.check_output('xtensa-esp32-elf-objdump -t {} | grep test_desc > case_address.tmp'.format(elf_file),
shell=True)
subprocess.check_output('xtensa-esp32-elf-objdump -s {} > section_table.tmp'.format(elf_file), shell=True)
table = CreateSectionTable.SectionTable("section_table.tmp")
+ tags = self.parse_tags(os.path.join(config_output_folder, self.SDKCONFIG_FILE))
test_cases = []
with open("case_address.tmp", "r") as f:
for line in f:
name_addr = table.get_unsigned_int(section, test_addr, 4)
desc_addr = table.get_unsigned_int(section, test_addr + 4, 4)
file_name_addr = table.get_unsigned_int(section, test_addr + 12, 4)
+ function_count = table.get_unsigned_int(section, test_addr+20, 4)
name = table.get_string("any", name_addr)
desc = table.get_string("any", desc_addr)
file_name = table.get_string("any", file_name_addr)
-
- tc = self.parse_one_test_case(name, desc, file_name, app_name)
+ tc = self.parse_one_test_case(name, desc, file_name, config_name, tags)
# check if duplicated case names
# we need to use it to select case,
# if duplicated IDs, Unity could select incorrect case to run
# and we need to check all cases no matter if it's going te be executed by CI
# also add app_name here, we allow same case for different apps
- if (tc["summary"] + app_name) in self.test_case_names:
+ if (tc["summary"] + config_name) in self.test_case_names:
self.parsing_errors.append("duplicated test case ID: " + tc["summary"])
else:
- self.test_case_names.add(tc["summary"] + app_name)
+ self.test_case_names.add(tc["summary"] + config_name)
if tc["CI ready"] == "Yes":
# update test env list and the cases of same env list
self.test_env_tags[tc["test environment"]].append(tc["ID"])
else:
self.test_env_tags.update({tc["test environment"]: [tc["ID"]]})
- # only add cases need to be executed
+
+ if function_count > 1:
+ tc.update({"child case num": function_count})
+
+ # only add cases need to be executed
test_cases.append(tc)
os.remove("section_table.tmp")
pass
return p
- def parse_one_test_case(self, name, description, file_name, app_name):
+ def parse_tags(self, sdkconfig_file):
+ """
+ Some test configs could requires different DUTs.
+ For example, if CONFIG_SPIRAM_SUPPORT is enabled, we need WROVER-Kit to run test.
+ This method will get tags for runners according to ConfigDependency.yml(maps tags to sdkconfig).
+
+ :param sdkconfig_file: sdkconfig file of the unit test config
+ :return: required tags for runners
+ """
+ required_tags = []
+ with open(sdkconfig_file, "r") as f:
+ configs_raw_data = f.read()
+ configs = configs_raw_data.splitlines(False)
+ for tag in self.config_dependency:
+ if self.config_dependency[tag] in configs:
+ required_tags.append(tag)
+ return required_tags
+
+ def parse_one_test_case(self, name, description, file_name, config_name, tags):
"""
parse one test case
:param name: test case name (summary)
:param description: test case description (tag string)
:param file_name: the file defines this test case
- :param app_name: built unit test app name
+ :param config_name: built unit test app name
+ :param tags: tags to select runners
:return: parsed test case
"""
prop = self.parse_case_properities(description)
- idf_path = os.getenv("IDF_PATH")
-
- # use relative file path to IDF_PATH, to make sure file path is consist
- relative_file_path = os.path.relpath(file_name, idf_path)
-
- file_name_hash = int(hashlib.sha256(relative_file_path).hexdigest(), base=16) % 1000
-
- if file_name_hash in self.file_name_cache:
- self.file_name_cache[file_name_hash] += 1
- else:
- self.file_name_cache[file_name_hash] = 1
-
- tc_id = "UT_%s_%s_%03d%02d" % (self.module_map[prop["module"]]['module abbr'],
- self.module_map[prop["module"]]['sub module abbr'],
- file_name_hash,
- self.file_name_cache[file_name_hash])
-
test_case = deepcopy(TEST_CASE_PATTERN)
- test_case.update({"Test App": self.APP_NAME_PREFIX + app_name,
+ test_case.update({"config": config_name,
"module": self.module_map[prop["module"]]['module'],
"CI ready": "No" if prop["ignore"] == "Yes" else "Yes",
- "cmd set": ["IDFUnitTest/UnitTest", [name]],
- "ID": tc_id,
+ "ID": name,
"test point 2": prop["module"],
"steps": name,
"test environment": prop["test_env"],
"reset": prop["reset"],
"sub module": self.module_map[prop["module"]]['sub module'],
- "summary": name})
+ "summary": name,
+ "multi_device": prop["multi_device"],
+ "multi_stage": prop["multi_stage"],
+ "timeout": int(prop["timeout"]),
+ "tags": tags})
return test_case
def dump_test_cases(self, test_cases):
""" parse test cases from multiple built unit test apps """
test_cases = []
- test_app_folder = os.path.join(self.idf_path, self.UT_BIN_FOLDER)
- test_apps = os.listdir(test_app_folder)
- for app in test_apps:
- elf_file = os.path.join(test_app_folder, app, self.ELF_FILE)
- if os.path.exists(elf_file):
- test_cases.extend(self.parse_test_cases_from_elf(elf_file, app))
-
+ output_folder = os.path.join(self.idf_path, self.UT_BIN_FOLDER)
+ test_configs = os.listdir(output_folder)
+ for config in test_configs:
+ config_output_folder = os.path.join(output_folder, config)
+ if os.path.exists(config_output_folder):
+ test_cases.extend(self.parse_test_cases_for_one_config(config_output_folder, config))
+ test_cases.sort(key=lambda x: x["config"] + x["summary"])
self.dump_test_cases(test_cases)
if __name__ == '__main__':
main()
-
--- /dev/null
+"""
+Test script for unit test case.
+"""
+
+import re
+import os
+import sys
+import time
+import threading
+
+# if we want to run test case outside `tiny-test-fw` folder,
+# we need to insert tiny-test-fw path into sys path
+test_fw_path = os.getenv("TEST_FW_PATH")
+if test_fw_path and test_fw_path not in sys.path:
+ sys.path.insert(0, test_fw_path)
+
+import TinyFW
+import IDF
+import Utility
+from DUT import ExpectTimeout
+from IDF.IDFApp import UT
+
+
+UT_APP_BOOT_UP_DONE = "Press ENTER to see the list of tests."
+RESET_PATTERN = re.compile(r"(ets [\w]{3}\s+[\d]{1,2} [\d]{4} [\d]{2}:[\d]{2}:[\d]{2}[^()]*\([\w].*?\))")
+EXCEPTION_PATTERN = re.compile(r"(Guru Meditation Error: Core\s+\d panic'ed \([\w].*?\))")
+ABORT_PATTERN = re.compile(r"(abort\(\) was called at PC 0x[a-eA-E\d]{8} on core \d)")
+FINISH_PATTERN = re.compile(r"1 Tests (\d) Failures (\d) Ignored")
+
+STARTUP_TIMEOUT = 10
+DUT_STARTUP_CHECK_RETRY_COUNT = 5
+TEST_HISTORY_CHECK_TIMEOUT = 1
+
+
+def format_test_case_config(test_case_data):
+ """
+ convert the test case data to unified format.
+ We need to following info to run unit test cases:
+
+ 1. unit test app config
+ 2. test case name
+ 3. test case reset info
+
+ the formatted case config is a dict, with ut app config as keys. The value is a list of test cases.
+ Each test case is a dict with "name" and "reset" as keys. For example::
+
+ case_config = {
+ "default": [{"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, {...}],
+ "psram": [{"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}],
+ }
+
+ If config is not specified for test case, then
+
+ :param test_case_data: string, list, or a dictionary list
+ :return: formatted data
+ """
+
+ case_config = dict()
+
+ def parse_case(one_case_data):
+ """ parse and format one case """
+
+ def process_reset_list(reset_list):
+ # strip space and remove white space only items
+ _output = list()
+ for _r in reset_list:
+ _data = _r.strip(" ")
+ if _data:
+ _output.append(_data)
+ return _output
+
+ _case = dict()
+ if isinstance(one_case_data, str):
+ _temp = one_case_data.split(" [reset=")
+ _case["name"] = _temp[0]
+ try:
+ _case["reset"] = process_reset_list(_temp[1][0:-1].split(","))
+ except IndexError:
+ _case["reset"] = list()
+ elif isinstance(one_case_data, dict):
+ _case = one_case_data.copy()
+ assert "name" in _case
+ if "reset" not in _case:
+ _case["reset"] = list()
+ else:
+ if isinstance(_case["reset"], str):
+ _case["reset"] = process_reset_list(_case["reset"].split(","))
+ else:
+ raise TypeError("Not supported type during parsing unit test case")
+
+ if "config" not in _case:
+ _case["config"] = "default"
+
+ return _case
+
+ if not isinstance(test_case_data, list):
+ test_case_data = [test_case_data]
+
+ for case_data in test_case_data:
+ parsed_case = parse_case(case_data)
+ try:
+ case_config[parsed_case["config"]].append(parsed_case)
+ except KeyError:
+ case_config[parsed_case["config"]] = [parsed_case]
+
+ return case_config
+
+
+def replace_app_bin(dut, name, new_app_bin):
+ if new_app_bin is None:
+ return
+ search_pattern = '/{}.bin'.format(name)
+ for i, config in enumerate(dut.download_config):
+ if config.endswith(search_pattern):
+ dut.download_config[i] = new_app_bin
+ Utility.console_log("The replaced application binary is {}".format(new_app_bin), "O")
+ break
+
+
+def reset_dut(dut):
+ dut.reset()
+ # esptool ``run`` cmd takes quite long time.
+ # before reset finish, serial port is closed. therefore DUT could already bootup before serial port opened.
+ # this could cause checking bootup print failed.
+ # now use input cmd `-` and check test history to check if DUT is bootup.
+ # we'll retry this step for a few times in case `dut.reset` returns during DUT bootup (when DUT can't process any command).
+ for _ in range(DUT_STARTUP_CHECK_RETRY_COUNT):
+ dut.write("-")
+ try:
+ dut.expect("0 Tests 0 Failures 0 Ignored", timeout=TEST_HISTORY_CHECK_TIMEOUT)
+ break
+ except ExpectTimeout:
+ pass
+ else:
+ raise AssertionError("Reset {} ({}) failed!".format(dut.name, dut.port))
+
+
+def run_one_normal_case(dut, one_case, junit_test_case, failed_cases):
+
+ reset_dut(dut)
+
+ dut.start_capture_raw_data()
+ # run test case
+ dut.write("\"{}\"".format(one_case["name"]))
+ dut.expect("Running " + one_case["name"] + "...")
+
+ exception_reset_list = []
+
+ # we want to set this flag in callbacks (inner functions)
+ # use list here so we can use append to set this flag
+ test_finish = list()
+
+ # expect callbacks
+ def one_case_finish(result):
+ """ one test finished, let expect loop break and log result """
+ test_finish.append(True)
+ output = dut.stop_capture_raw_data()
+ if result:
+ Utility.console_log("Success: " + one_case["name"], color="green")
+ else:
+ failed_cases.append(one_case["name"])
+ Utility.console_log("Failed: " + one_case["name"], color="red")
+ junit_test_case.add_failure_info(output)
+
+ def handle_exception_reset(data):
+ """
+ just append data to exception list.
+ exception list will be checked in ``handle_reset_finish``, once reset finished.
+ """
+ exception_reset_list.append(data[0])
+
+ def handle_test_finish(data):
+ """ test finished without reset """
+ # in this scenario reset should not happen
+ assert not exception_reset_list
+ if int(data[1]):
+ # case ignored
+ Utility.console_log("Ignored: " + one_case["name"], color="orange")
+ junit_test_case.add_skipped_info("ignored")
+ one_case_finish(not int(data[0]))
+
+ def handle_reset_finish(data):
+ """ reset happened and reboot finished """
+ assert exception_reset_list # reboot but no exception/reset logged. should never happen
+ result = False
+ if len(one_case["reset"]) == len(exception_reset_list):
+ for i, exception in enumerate(exception_reset_list):
+ if one_case["reset"][i] not in exception:
+ break
+ else:
+ result = True
+ if not result:
+ err_msg = "Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}".format(one_case["reset"],
+ exception_reset_list)
+ Utility.console_log(err_msg, color="orange")
+ junit_test_case.add_error_info(err_msg)
+ one_case_finish(result)
+
+ while not test_finish:
+ try:
+ dut.expect_any((RESET_PATTERN, handle_exception_reset),
+ (EXCEPTION_PATTERN, handle_exception_reset),
+ (ABORT_PATTERN, handle_exception_reset),
+ (FINISH_PATTERN, handle_test_finish),
+ (UT_APP_BOOT_UP_DONE, handle_reset_finish),
+ timeout=one_case["timeout"])
+ except ExpectTimeout:
+ Utility.console_log("Timeout in expect", color="orange")
+ junit_test_case.add_error_info("timeout")
+ one_case_finish(False)
+ break
+
+
+@IDF.idf_unit_test(env_tag="UT_T1_1", junit_report_by_case=True)
+def run_unit_test_cases(env, extra_data):
+ """
+ extra_data can be three types of value
+ 1. as string:
+ 1. "case_name"
+ 2. "case_name [reset=RESET_REASON]"
+ 2. as dict:
+ 1. with key like {"name": "Intr_alloc test, shared ints"}
+ 2. with key like {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET", "config": "psram"}
+ 3. as list of string or dict:
+ [case1, case2, case3, {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, ...]
+
+ :param extra_data: the case name or case list or case dictionary
+ :return: None
+ """
+
+ case_config = format_test_case_config(extra_data)
+
+ # we don't want stop on failed case (unless some special scenarios we can't handle)
+ # this flag is used to log if any of the case failed during executing
+ # Before exit test function this flag is used to log if the case fails
+ failed_cases = []
+
+ for ut_config in case_config:
+ Utility.console_log("Running unit test for config: " + ut_config, "O")
+ dut = env.get_dut("unit-test-app", app_path=ut_config)
+ dut.start_app()
+ Utility.console_log("Download finished, start running test cases", "O")
+
+ for one_case in case_config[ut_config]:
+ # create junit report test case
+ junit_test_case = TinyFW.JunitReport.create_test_case("[{}] {}".format(ut_config, one_case["name"]))
+ try:
+ run_one_normal_case(dut, one_case, junit_test_case, failed_cases)
+ TinyFW.JunitReport.test_case_finish(junit_test_case)
+ except Exception as e:
+ junit_test_case.add_error_info("Unexpected exception: " + str(e))
+ TinyFW.JunitReport.test_case_finish(junit_test_case)
+
+ # raise exception if any case fails
+ if failed_cases:
+ Utility.console_log("Failed Cases:", color="red")
+ for _case_name in failed_cases:
+ Utility.console_log("\t" + _case_name, color="red")
+ raise AssertionError("Unit Test Failed")
+
+
+class Handler(threading.Thread):
+
+ WAIT_SIGNAL_PATTERN = re.compile(r'Waiting for signal: \[(.+)\]!')
+ SEND_SIGNAL_PATTERN = re.compile(r'Send signal: \[(.+)\]!')
+ FINISH_PATTERN = re.compile(r"1 Tests (\d) Failures (\d) Ignored")
+
+ def __init__(self, dut, sent_signal_list, lock, parent_case_name, child_case_index, timeout):
+ self.dut = dut
+ self.sent_signal_list = sent_signal_list
+ self.lock = lock
+ self.parent_case_name = parent_case_name
+ self.child_case_name = ""
+ self.child_case_index = child_case_index + 1
+ self.finish = False
+ self.result = False
+ self.output = ""
+ self.fail_name = None
+ self.timeout = timeout
+ self.force_stop = threading.Event() # it show the running status
+
+ reset_dut(self.dut) # reset the board to make it start from begining
+
+ threading.Thread.__init__(self, name="{} Handler".format(dut))
+
+ def run(self):
+
+ self.dut.start_capture_raw_data()
+
+ def get_child_case_name(data):
+ self.child_case_name = data[0]
+ time.sleep(1)
+ self.dut.write(str(self.child_case_index))
+
+ def one_device_case_finish(result):
+ """ one test finished, let expect loop break and log result """
+ self.finish = True
+ self.result = result
+ self.output = "[{}]\n\n{}\n".format(self.child_case_name,
+ self.dut.stop_capture_raw_data())
+ if not result:
+ self.fail_name = self.child_case_name
+
+ def device_wait_action(data):
+ start_time = time.time()
+ expected_signal = data[0]
+ while 1:
+ if time.time() > start_time + self.timeout:
+ Utility.console_log("Timeout in device for function: %s"%self.child_case_name, color="orange")
+ break
+ with self.lock:
+ if expected_signal in self.sent_signal_list:
+ self.dut.write(" ")
+ self.sent_signal_list.remove(expected_signal)
+ break
+ time.sleep(0.01)
+
+ def device_send_action(data):
+ with self.lock:
+ self.sent_signal_list.append(data[0].encode('utf-8'))
+
+ def handle_device_test_finish(data):
+ """ test finished without reset """
+ # in this scenario reset should not happen
+ if int(data[1]):
+ # case ignored
+ Utility.console_log("Ignored: " + self.child_case_name, color="orange")
+ one_device_case_finish(not int(data[0]))
+
+ try:
+ time.sleep(1)
+ self.dut.write("\"{}\"".format(self.parent_case_name))
+ self.dut.expect("Running " + self.parent_case_name + "...")
+ except ExpectTimeout:
+ Utility.console_log("No case detected!", color="orange")
+ while not self.finish and not self.force_stop.isSet():
+ try:
+ self.dut.expect_any((re.compile('\(' + str(self.child_case_index) + '\)\s"(\w+)"'), get_child_case_name),
+ (self.WAIT_SIGNAL_PATTERN, device_wait_action), # wait signal pattern
+ (self.SEND_SIGNAL_PATTERN, device_send_action), # send signal pattern
+ (self.FINISH_PATTERN, handle_device_test_finish), # test finish pattern
+ timeout=self.timeout)
+ except ExpectTimeout:
+ Utility.console_log("Timeout in expect", color="orange")
+ one_device_case_finish(False)
+ break
+
+ def stop(self):
+ self.force_stop.set()
+
+
+def get_case_info(one_case):
+ parent_case = one_case["name"]
+ child_case_num = one_case["child case num"]
+ return parent_case, child_case_num
+
+
+def get_dut(duts, env, name, ut_config):
+ if name in duts:
+ dut = duts[name]
+ else:
+ dut = env.get_dut(name, app_path=ut_config)
+ duts[name] = dut
+ dut.start_app()
+ return dut
+
+
+def run_one_multiple_devices_case(duts, ut_config, env, one_case, failed_cases, junit_test_case):
+ lock = threading.RLock()
+ threads = []
+ send_signal_list = []
+ result = True
+ parent_case, case_num = get_case_info(one_case)
+
+ for i in range(case_num):
+ dut = get_dut(duts, env, "dut%d" % i, ut_config)
+ threads.append(Handler(dut, send_signal_list, lock,
+ parent_case, i, one_case["timeout"]))
+ for thread in threads:
+ thread.setDaemon(True)
+ thread.start()
+ output = "Multiple Device Failed\n"
+ for thread in threads:
+ thread.join()
+ result = result and thread.result
+ output += thread.output
+ if not thread.result:
+ [thd.stop() for thd in threads]
+
+ if result:
+ Utility.console_log("Success: " + one_case["name"], color="green")
+ else:
+ failed_cases.append(one_case["name"])
+ junit_test_case.add_failure_info(output)
+ Utility.console_log("Failed: " + one_case["name"], color="red")
+
+
+@IDF.idf_unit_test(env_tag="UT_T2_1", junit_report_by_case=True)
+def run_multiple_devices_cases(env, extra_data):
+ """
+ extra_data can be two types of value
+ 1. as dict:
+ e.g.
+ {"name": "gpio master/slave test example",
+ "child case num": 2,
+ "config": "release",
+ "env_tag": "UT_T2_1"}
+ 2. as list dict:
+ e.g.
+ [{"name": "gpio master/slave test example1",
+ "child case num": 2,
+ "config": "release",
+ "env_tag": "UT_T2_1"},
+ {"name": "gpio master/slave test example2",
+ "child case num": 2,
+ "config": "release",
+ "env_tag": "UT_T2_1"}]
+
+ """
+ failed_cases = []
+ case_config = format_test_case_config(extra_data)
+ duts = {}
+ for ut_config in case_config:
+ Utility.console_log("Running unit test for config: " + ut_config, "O")
+ for one_case in case_config[ut_config]:
+ junit_test_case = TinyFW.JunitReport.create_test_case("[{}] {}".format(ut_config, one_case["name"]))
+ try:
+ run_one_multiple_devices_case(duts, ut_config, env, one_case, failed_cases, junit_test_case)
+ TinyFW.JunitReport.test_case_finish(junit_test_case)
+ except Exception as e:
+ junit_test_case.add_error_info("Unexpected exception: " + str(e))
+ TinyFW.JunitReport.test_case_finish(junit_test_case)
+
+ if failed_cases:
+ Utility.console_log("Failed Cases:", color="red")
+ for _case_name in failed_cases:
+ Utility.console_log("\t" + _case_name, color="red")
+ raise AssertionError("Unit Test Failed")
+
+
+def run_one_multiple_stage_case(dut, one_case, failed_cases, junit_test_case):
+ reset_dut(dut)
+
+ dut.start_capture_raw_data()
+
+ exception_reset_list = []
+
+ for test_stage in range(one_case["child case num"]):
+ # select multi stage test case name
+ dut.write("\"{}\"".format(one_case["name"]))
+ dut.expect("Running " + one_case["name"] + "...")
+ # select test function for current stage
+ dut.write(str(test_stage + 1))
+
+ # we want to set this flag in callbacks (inner functions)
+ # use list here so we can use append to set this flag
+ stage_finish = list()
+
+ def last_stage():
+ return test_stage == one_case["child case num"] - 1
+
+ def check_reset():
+ if one_case["reset"]:
+ assert exception_reset_list # reboot but no exception/reset logged. should never happen
+ result = False
+ if len(one_case["reset"]) == len(exception_reset_list):
+ for i, exception in enumerate(exception_reset_list):
+ if one_case["reset"][i] not in exception:
+ break
+ else:
+ result = True
+ if not result:
+ err_msg = "Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}".format(one_case["reset"],
+ exception_reset_list)
+ Utility.console_log(err_msg, color="orange")
+ junit_test_case.add_error_info(err_msg)
+ else:
+ # we allow omit reset in multi stage cases
+ result = True
+ return result
+
+ # expect callbacks
+ def one_case_finish(result):
+ """ one test finished, let expect loop break and log result """
+ # handle test finish
+ result = result and check_reset()
+ output = dut.stop_capture_raw_data()
+ if result:
+ Utility.console_log("Success: " + one_case["name"], color="green")
+ else:
+ failed_cases.append(one_case["name"])
+ Utility.console_log("Failed: " + one_case["name"], color="red")
+ junit_test_case.add_failure_info(output)
+ stage_finish.append("break")
+
+ def handle_exception_reset(data):
+ """
+ just append data to exception list.
+ exception list will be checked in ``handle_reset_finish``, once reset finished.
+ """
+ exception_reset_list.append(data[0])
+
+ def handle_test_finish(data):
+ """ test finished without reset """
+ # in this scenario reset should not happen
+ if int(data[1]):
+ # case ignored
+ Utility.console_log("Ignored: " + one_case["name"], color="orange")
+ junit_test_case.add_skipped_info("ignored")
+ # only passed in last stage will be regarded as real pass
+ if last_stage():
+ one_case_finish(not int(data[0]))
+ else:
+ Utility.console_log("test finished before enter last stage", color="orange")
+ one_case_finish(False)
+
+ def handle_next_stage(data):
+ """ reboot finished. we goto next stage """
+ if last_stage():
+ # already last stage, should never goto next stage
+ Utility.console_log("didn't finish at last stage", color="orange")
+ one_case_finish(False)
+ else:
+ stage_finish.append("continue")
+
+ while not stage_finish:
+ try:
+ dut.expect_any((RESET_PATTERN, handle_exception_reset),
+ (EXCEPTION_PATTERN, handle_exception_reset),
+ (ABORT_PATTERN, handle_exception_reset),
+ (FINISH_PATTERN, handle_test_finish),
+ (UT_APP_BOOT_UP_DONE, handle_next_stage),
+ timeout=one_case["timeout"])
+ except ExpectTimeout:
+ Utility.console_log("Timeout in expect", color="orange")
+ one_case_finish(False)
+ break
+ if stage_finish[0] == "break":
+ # test breaks on current stage
+ break
+
+
+@IDF.idf_unit_test(env_tag="UT_T1_1", junit_report_by_case=True)
+def run_multiple_stage_cases(env, extra_data):
+ """
+ extra_data can be 2 types of value
+ 1. as dict: Mandantory keys: "name" and "child case num", optional keys: "reset" and others
+ 3. as list of string or dict:
+ [case1, case2, case3, {"name": "restart from PRO CPU", "child case num": 2}, ...]
+
+ :param extra_data: the case name or case list or case dictionary
+ :return: None
+ """
+
+ case_config = format_test_case_config(extra_data)
+
+ # we don't want stop on failed case (unless some special scenarios we can't handle)
+ # this flag is used to log if any of the case failed during executing
+ # Before exit test function this flag is used to log if the case fails
+ failed_cases = []
+
+ for ut_config in case_config:
+ Utility.console_log("Running unit test for config: " + ut_config, "O")
+ dut = env.get_dut("unit-test-app", app_path=ut_config)
+ dut.start_app()
+
+ for one_case in case_config[ut_config]:
+ junit_test_case = TinyFW.JunitReport.create_test_case("[{}] {}".format(ut_config, one_case["name"]))
+ try:
+ run_one_multiple_stage_case(dut, one_case, failed_cases, junit_test_case)
+ TinyFW.JunitReport.test_case_finish(junit_test_case)
+ except Exception as e:
+ junit_test_case.add_error_info("Unexpected exception: " + str(e))
+ TinyFW.JunitReport.test_case_finish(junit_test_case)
+
+ # raise exception if any case fails
+ if failed_cases:
+ Utility.console_log("Failed Cases:", color="red")
+ for _case_name in failed_cases:
+ Utility.console_log("\t" + _case_name, color="red")
+ raise AssertionError("Unit Test Failed")
+
+
+if __name__ == '__main__':
+ run_multiple_devices_cases(extra_data={"name": "gpio master/slave test example",
+ "child case num": 2,
+ "config": "release",
+ "env_tag": "UT_T2_1"})