]> granicus.if.org Git - esp-idf/commitdiff
freertos: Add critical sections to queue sets.
authorDarian Leung <darian@espressif.com>
Wed, 11 Jul 2018 12:50:43 +0000 (20:50 +0800)
committerDarian Leung <darian@espressif.com>
Fri, 13 Jul 2018 05:58:54 +0000 (13:58 +0800)
Queue sets are not SMP safe. This commit adds
critical sections to queue sets. Unit tests for
queue sets have also been added.

components/freertos/queue.c
components/freertos/test/test_queuesets.c [new file with mode: 0644]

index c5c02c1a9889ab0b8a2893795aaa730795cc7f37..a3a14931a62b9a558cd63e63d6389e45d4a7e7b3 100644 (file)
@@ -2458,8 +2458,7 @@ Queue_t * const pxQueue = ( Queue_t * ) xQueue;
        {
        BaseType_t xReturn;
 
-//ToDo: figure out locking
-//             taskENTER_CRITICAL(&pxQueue->mux);
+               taskENTER_CRITICAL(&(((Queue_t * )xQueueOrSemaphore)->mux));
                {
                        if( ( ( Queue_t * ) xQueueOrSemaphore )->pxQueueSetContainer != NULL )
                        {
@@ -2478,7 +2477,7 @@ Queue_t * const pxQueue = ( Queue_t * ) xQueue;
                                xReturn = pdPASS;
                        }
                }
-//             taskEXIT_CRITICAL(&pxQueue->mux);
+               taskEXIT_CRITICAL(&(((Queue_t * )xQueueOrSemaphore)->mux));
 
                return xReturn;
        }
@@ -2507,12 +2506,12 @@ Queue_t * const pxQueue = ( Queue_t * ) xQueue;
                }
                else
                {
-//                     taskENTER_CRITICAL(&pxQueue->mux);
+                       taskENTER_CRITICAL(&(pxQueueOrSemaphore->mux));
                        {
                                /* The queue is no longer contained in the set. */
                                pxQueueOrSemaphore->pxQueueSetContainer = NULL;
                        }
-//                     taskEXIT_CRITICAL(&pxQueue->mux);
+                       taskEXIT_CRITICAL(&(pxQueueOrSemaphore->mux));
                        xReturn = pdPASS;
                }
 
@@ -2555,9 +2554,15 @@ Queue_t * const pxQueue = ( Queue_t * ) xQueue;
        Queue_t *pxQueueSetContainer = pxQueue->pxQueueSetContainer;
        BaseType_t xReturn = pdFALSE;
 
-               /* This function must be called form a critical section. */
+               /*
+                * This function is called with a Queue's / Semaphore's spinlock already
+                * acquired. Acquiring the Queue set's spinlock is still necessary.
+                */
 
                configASSERT( pxQueueSetContainer );
+
+               //Acquire the Queue set's spinlock
+               portENTER_CRITICAL(&(pxQueueSetContainer->mux));
                configASSERT( pxQueueSetContainer->uxMessagesWaiting < pxQueueSetContainer->uxLength );
 
                if( pxQueueSetContainer->uxMessagesWaiting < pxQueueSetContainer->uxLength )
@@ -2588,6 +2593,9 @@ Queue_t * const pxQueue = ( Queue_t * ) xQueue;
                        mtCOVERAGE_TEST_MARKER();
                }
 
+               //Release the Queue set's spinlock
+               portEXIT_CRITICAL(&(pxQueueSetContainer->mux));
+
                return xReturn;
        }
 
diff --git a/components/freertos/test/test_queuesets.c b/components/freertos/test/test_queuesets.c
new file mode 100644 (file)
index 0000000..00e440d
--- /dev/null
@@ -0,0 +1,137 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include "freertos/FreeRTOS.h"
+#include "freertos/task.h"
+#include "freertos/queue.h"
+#include "freertos/semphr.h"
+#include "unity.h"
+
+/*
+ * Basic queue set tests. Multiple queues are added to a queue set then each
+ * queue is filled in a sequential order. The members returned from the queue
+ * set must adhered to the order in which the queues were filled.
+ */
+#define NO_OF_QUEUES            5
+#define QUEUE_LEN               4
+#define ITEM_SIZE               sizeof(uint32_t)
+
+static QueueHandle_t handles[NO_OF_QUEUES];
+static QueueSetHandle_t set_handle;
+
+TEST_CASE("Test Queue sets", "[freertos]")
+{
+    //Create queue set, queues, and add queues to queue set
+    set_handle = xQueueCreateSet(NO_OF_QUEUES * QUEUE_LEN);
+    for (int i = 0; i < NO_OF_QUEUES; i++) {
+        handles[i] = xQueueCreate(QUEUE_LEN, ITEM_SIZE);
+        TEST_ASSERT_MESSAGE(handles[i] != NULL, "Failed to create queue");
+        TEST_ASSERT_MESSAGE(xQueueAddToSet(handles[i], set_handle) == pdPASS, "Failed to add to queue set");
+    }
+
+    //Fill queue set via filling each queue
+    for (int i = 0; i < NO_OF_QUEUES; i++) {
+        for (int j = 0; j < QUEUE_LEN; j++) {
+            uint32_t item_num = (i * QUEUE_LEN) + j;
+            TEST_ASSERT_MESSAGE(xQueueSendToBack(handles[i], &item_num, portMAX_DELAY) == pdTRUE, "Failed to send to queue");
+        }
+    }
+
+    //Check queue set is notified in correct order
+    for (int i = 0; i < NO_OF_QUEUES; i++) {
+        for (int j = 0; j < QUEUE_LEN; j++) {
+            QueueSetMemberHandle_t member = xQueueSelectFromSet(set_handle, portMAX_DELAY);
+            TEST_ASSERT_EQUAL_MESSAGE(handles[i], member, "Incorrect queue set member returned");
+            uint32_t item;
+            xQueueReceive((QueueHandle_t)member, &item, 0);
+            TEST_ASSERT_EQUAL_MESSAGE(((i * QUEUE_LEN) + j), item, "Incorrect item value");
+        }
+    }
+
+    //Remove queues from queue set and delete queues
+    for (int i = 0; i < NO_OF_QUEUES; i++) {
+        TEST_ASSERT_MESSAGE(xQueueRemoveFromSet(handles[i], set_handle), "Failed to remove from queue set");
+        vQueueDelete(handles[i]);
+    }
+    vQueueDelete(set_handle);
+}
+
+/*
+ * Queue set thread safety test. Test the SMP thread safety by adding two queues
+ * to a queue set and have a task on each core send to the queues simultaneously.
+ * Check returned queue set members are valid.
+ */
+#ifndef CONFIG_FREERTOS_UNICORE
+static volatile bool sync_flags[portNUM_PROCESSORS];
+static SemaphoreHandle_t sync_sem;
+
+static void send_task(void *arg)
+{
+    QueueHandle_t queue = (QueueHandle_t)arg;
+
+    //Wait until task on the other core starts running
+    xSemaphoreTake(sync_sem, portMAX_DELAY);
+    sync_flags[xPortGetCoreID()] = true;
+    while (!sync_flags[!xPortGetCoreID()]) {
+        ;
+    }
+
+    //Fill queue
+    for (int i = 0; i < QUEUE_LEN; i++) {
+        uint32_t item = i;
+        xQueueSendToBack(queue, &item, portMAX_DELAY);
+    }
+
+    xSemaphoreGive(sync_sem);
+    vTaskDelete(NULL);
+}
+
+TEST_CASE("Test Queue sets thread safety", "[freertos]")
+{
+    //Create queue set, queues, and a send task on each core
+    sync_sem = xSemaphoreCreateCounting(portNUM_PROCESSORS, 0);
+    QueueHandle_t queue_handles[portNUM_PROCESSORS];
+    QueueSetHandle_t queueset_handle = xQueueCreateSet(portNUM_PROCESSORS * QUEUE_LEN);
+    for (int i = 0; i < portNUM_PROCESSORS; i++) {
+        sync_flags[i] = false;
+        queue_handles[i] = xQueueCreate(QUEUE_LEN, ITEM_SIZE);
+        TEST_ASSERT_MESSAGE(xQueueAddToSet(queue_handles[i], queueset_handle) == pdPASS, "Failed to add to queue set");
+        xTaskCreatePinnedToCore(send_task, "send", 2048, (void *)queue_handles[i], 10, NULL, i);
+    }
+
+    //Start both send tasks
+    portDISABLE_INTERRUPTS();
+    for (int i = 0; i < portNUM_PROCESSORS; i++) {
+        xSemaphoreGive(sync_sem);
+    }
+    portENABLE_INTERRUPTS();
+    vTaskDelay(2);
+
+    //Check returned queue set members are valid
+    uint32_t expect_0 = 0;
+    uint32_t expect_1 = 0;
+    for (int i = 0; i < (portNUM_PROCESSORS * QUEUE_LEN); i++) {
+        QueueSetMemberHandle_t member = xQueueSelectFromSet(queueset_handle, portMAX_DELAY);
+        uint32_t item;
+        if (member == queue_handles[0]) {
+            xQueueReceive((QueueHandle_t)member, &item, 0);
+            TEST_ASSERT_EQUAL_MESSAGE(expect_0, item, "Incorrect item value");
+            expect_0++;
+        } else if (member == queue_handles[1]) {
+            xQueueReceive((QueueHandle_t)member, &item, 0);
+            TEST_ASSERT_EQUAL_MESSAGE(expect_1, item, "Incorrect item value");
+            expect_1++;
+        } else {
+            TEST_ASSERT_MESSAGE(0, "Incorrect queue set member returned");
+        }
+    }
+
+    for (int i = 0; i < portNUM_PROCESSORS; i++) {
+        xSemaphoreTake(sync_sem, portMAX_DELAY);
+    }
+    for (int i = 0; i < portNUM_PROCESSORS; i++) {
+        xQueueRemoveFromSet(queueset_handle, handles[i]);
+        vQueueDelete(queue_handles[i]);
+    }
+    vQueueDelete(queueset_handle);
+}
+#endif