]> granicus.if.org Git - libvpx/commitdiff
Amend and improve VP8 multithreading implementation
authorYunqing Wang <yunqingwang@google.com>
Thu, 7 Jan 2016 02:27:37 +0000 (18:27 -0800)
committerYunqing Wang <yunqingwang@google.com>
Fri, 8 Jan 2016 19:59:49 +0000 (11:59 -0800)
There are flaws in current implementation of VP8 multithreading encoder
and decoder as reported in the following issue:
https://code.google.com/p/chromium/issues/detail?id=158922

Although the data race warnings are harmless, and wouldn't cause real
problems while encoding and decoding videos, it is better to fix the
warnings so that VP8 code could pass the TSan test.

To synchronize the thread-shared data access and maintain the speed
(i.e. decoding speed), use multiple mutexes based on mb_rows to reduce
the number of synchronizations needed, make the reads and writes of
the shared data protected, and reduce the number of mb_col writes by
nsync times.

The decoder speed tests showed < 3% speed loss while using 2 ~ 4
threads.

Change-Id: Ie296defffcd86a693188b668270d811964227882

vp8/common/threading.h
vp8/decoder/onyxd_int.h
vp8/decoder/threading.c
vp8/encoder/encodeframe.c
vp8/encoder/ethreading.c
vp8/encoder/onyx_if.c
vp8/encoder/onyx_int.h

index 01c82dbb805b8fff75636bcb9ff8a464f4c95088..b2e6ded3d89aff30279beecef21b31abbe0df0ce 100644 (file)
@@ -185,6 +185,47 @@ static inline int sem_destroy(sem_t * sem)
 #define x86_pause_hint()
 #endif
 
+#include "vpx_util/vpx_thread.h"
+
+static INLINE void mutex_lock(pthread_mutex_t *const mutex) {
+    const int kMaxTryLocks = 4000;
+    int locked = 0;
+    int i;
+
+    for (i = 0; i < kMaxTryLocks; ++i) {
+        if (!pthread_mutex_trylock(mutex)) {
+            locked = 1;
+            break;
+        }
+    }
+
+    if (!locked)
+        pthread_mutex_lock(mutex);
+}
+
+static INLINE int protected_read(pthread_mutex_t *const mutex, const int *p) {
+    int ret;
+    mutex_lock(mutex);
+    ret = *p;
+    pthread_mutex_unlock(mutex);
+    return ret;
+}
+
+static INLINE void sync_read(pthread_mutex_t *const mutex, int mb_col,
+                             const int *last_row_current_mb_col,
+                             const int nsync) {
+    while (mb_col > (protected_read(mutex, last_row_current_mb_col) - nsync)) {
+        x86_pause_hint();
+        thread_sleep(0);
+    }
+}
+
+static INLINE void protected_write(pthread_mutex_t *mutex, int *p, int v) {
+    mutex_lock(mutex);
+    *p = v;
+    pthread_mutex_unlock(mutex);
+}
+
 #endif /* CONFIG_OS_SUPPORT && CONFIG_MULTITHREAD */
 
 #ifdef __cplusplus
index aa2cc57f7b18d3c180dacd83be3a417746192b95..313fe01c0773dfbfa9c2866267c73877e9ec7ebb 100644 (file)
@@ -81,7 +81,7 @@ typedef struct VP8D_COMP
 #if CONFIG_MULTITHREAD
     /* variable for threading */
 
-    volatile int b_multithreaded_rd;
+    int b_multithreaded_rd;
     int max_threads;
     int current_mb_col_main;
     unsigned int decoding_thread_count;
@@ -90,6 +90,8 @@ typedef struct VP8D_COMP
     int mt_baseline_filter_level[MAX_MB_SEGMENTS];
     int sync_range;
     int *mt_current_mb_col;                  /* Each row remembers its already decoded column. */
+    pthread_mutex_t *pmutex;
+    pthread_mutex_t mt_mutex;                /* mutex for b_multithreaded_rd */
 
     unsigned char **mt_yabove_row;           /* mb_rows x width */
     unsigned char **mt_uabove_row;
index 7c7184c78aab5e8fd65be4dd0b60361e8dad15ad..97979e3b2fd005aab0975d70227715f858fa60d8 100644 (file)
@@ -52,9 +52,6 @@ static void setup_decoding_thread_data(VP8D_COMP *pbi, MACROBLOCKD *xd, MB_ROW_D
         mbd->subpixel_predict8x8     = xd->subpixel_predict8x8;
         mbd->subpixel_predict16x16   = xd->subpixel_predict16x16;
 
-        mbd->mode_info_context = pc->mi   + pc->mode_info_stride * (i + 1);
-        mbd->mode_info_stride  = pc->mode_info_stride;
-
         mbd->frame_type = pc->frame_type;
         mbd->pre = xd->pre;
         mbd->dst = xd->dst;
@@ -298,8 +295,8 @@ static void mt_decode_macroblock(VP8D_COMP *pbi, MACROBLOCKD *xd,
 
 static void mt_decode_mb_rows(VP8D_COMP *pbi, MACROBLOCKD *xd, int start_mb_row)
 {
-    volatile const int *last_row_current_mb_col;
-    volatile int *current_mb_col;
+    const int *last_row_current_mb_col;
+    int *current_mb_col;
     int mb_row;
     VP8_COMMON *pc = &pbi->common;
     const int nsync = pbi->sync_range;
@@ -337,6 +334,9 @@ static void mt_decode_mb_rows(VP8D_COMP *pbi, MACROBLOCKD *xd, int start_mb_row)
 
     xd->up_available = (start_mb_row != 0);
 
+    xd->mode_info_context = pc->mi + pc->mode_info_stride * start_mb_row;
+    xd->mode_info_stride = pc->mode_info_stride;
+
     for (mb_row = start_mb_row; mb_row < pc->mb_rows; mb_row += (pbi->decoding_thread_count + 1))
     {
        int recon_yoffset, recon_uvoffset;
@@ -405,17 +405,15 @@ static void mt_decode_mb_rows(VP8D_COMP *pbi, MACROBLOCKD *xd, int start_mb_row)
                                  xd->dst.uv_stride);
        }
 
-       for (mb_col = 0; mb_col < pc->mb_cols; mb_col++)
-       {
-           *current_mb_col = mb_col - 1;
+       for (mb_col = 0; mb_col < pc->mb_cols; mb_col++) {
+           if (((mb_col - 1) % nsync) == 0) {
+               pthread_mutex_t *mutex = &pbi->pmutex[mb_row];
+               protected_write(mutex, current_mb_col, mb_col - 1);
+           }
 
-           if ((mb_col & (nsync - 1)) == 0)
-           {
-               while (mb_col > (*last_row_current_mb_col - nsync))
-               {
-                   x86_pause_hint();
-                   thread_sleep(0);
-               }
+           if (mb_row && !(mb_col & (nsync - 1))) {
+               pthread_mutex_t *mutex = &pbi->pmutex[mb_row-1];
+               sync_read(mutex, mb_col, last_row_current_mb_col, nsync);
            }
 
            /* Distance of MB to the various image edges.
@@ -604,7 +602,7 @@ static void mt_decode_mb_rows(VP8D_COMP *pbi, MACROBLOCKD *xd, int start_mb_row)
                              xd->dst.u_buffer + 8, xd->dst.v_buffer + 8);
 
        /* last MB of row is ready just after extension is done */
-       *current_mb_col = mb_col + nsync;
+       protected_write(&pbi->pmutex[mb_row], current_mb_col, mb_col + nsync);
 
        ++xd->mode_info_context;      /* skip prediction column */
        xd->up_available = 1;
@@ -629,12 +627,12 @@ static THREAD_FUNCTION thread_decoding_proc(void *p_data)
 
     while (1)
     {
-        if (pbi->b_multithreaded_rd == 0)
+        if (protected_read(&pbi->mt_mutex, &pbi->b_multithreaded_rd) == 0)
             break;
 
         if (sem_wait(&pbi->h_event_start_decoding[ithread]) == 0)
         {
-            if (pbi->b_multithreaded_rd == 0)
+            if (protected_read(&pbi->mt_mutex, &pbi->b_multithreaded_rd) == 0)
                 break;
             else
             {
@@ -657,6 +655,7 @@ void vp8_decoder_create_threads(VP8D_COMP *pbi)
 
     pbi->b_multithreaded_rd = 0;
     pbi->allocated_decoding_thread_count = 0;
+    pthread_mutex_init(&pbi->mt_mutex, NULL);
 
     /* limit decoding threads to the max number of token partitions */
     core_count = (pbi->max_threads > 8) ? 8 : pbi->max_threads;
@@ -699,8 +698,17 @@ void vp8mt_de_alloc_temp_buffers(VP8D_COMP *pbi, int mb_rows)
 {
     int i;
 
-    if (pbi->b_multithreaded_rd)
+    if (protected_read(&pbi->mt_mutex, &pbi->b_multithreaded_rd))
     {
+        /* De-allocate mutex */
+        if (pbi->pmutex != NULL) {
+            for (i = 0; i < mb_rows; i++) {
+                pthread_mutex_destroy(&pbi->pmutex[i]);
+            }
+            vpx_free(pbi->pmutex);
+            pbi->pmutex = NULL;
+        }
+
             vpx_free(pbi->mt_current_mb_col);
             pbi->mt_current_mb_col = NULL ;
 
@@ -781,7 +789,7 @@ void vp8mt_alloc_temp_buffers(VP8D_COMP *pbi, int width, int prev_mb_rows)
     int i;
     int uv_width;
 
-    if (pbi->b_multithreaded_rd)
+    if (protected_read(&pbi->mt_mutex, &pbi->b_multithreaded_rd))
     {
         vp8mt_de_alloc_temp_buffers(pbi, prev_mb_rows);
 
@@ -796,6 +804,15 @@ void vp8mt_alloc_temp_buffers(VP8D_COMP *pbi, int width, int prev_mb_rows)
 
         uv_width = width >>1;
 
+        /* Allocate mutex */
+        CHECK_MEM_ERROR(pbi->pmutex, vpx_malloc(sizeof(*pbi->pmutex) *
+                                                pc->mb_rows));
+        if (pbi->pmutex) {
+            for (i = 0; i < pc->mb_rows; i++) {
+                pthread_mutex_init(&pbi->pmutex[i], NULL);
+            }
+        }
+
         /* Allocate an int for each mb row. */
         CALLOC_ARRAY(pbi->mt_current_mb_col, pc->mb_rows);
 
@@ -831,11 +848,11 @@ void vp8mt_alloc_temp_buffers(VP8D_COMP *pbi, int width, int prev_mb_rows)
 void vp8_decoder_remove_threads(VP8D_COMP *pbi)
 {
     /* shutdown MB Decoding thread; */
-    if (pbi->b_multithreaded_rd)
+    if (protected_read(&pbi->mt_mutex, &pbi->b_multithreaded_rd))
     {
         int i;
 
-        pbi->b_multithreaded_rd = 0;
+        protected_write(&pbi->mt_mutex, &pbi->b_multithreaded_rd, 0);
 
         /* allow all threads to exit */
         for (i = 0; i < pbi->allocated_decoding_thread_count; i++)
@@ -863,6 +880,7 @@ void vp8_decoder_remove_threads(VP8D_COMP *pbi)
             vpx_free(pbi->de_thread_data);
             pbi->de_thread_data = NULL;
     }
+    pthread_mutex_destroy(&pbi->mt_mutex);
 }
 
 void vp8mt_decode_mb_rows( VP8D_COMP *pbi, MACROBLOCKD *xd)
index b0aaa2f0bf0cf7ff8c339353692c0a9055306909..9b05cd1fcd0371489ff4e4195cd3602e546bec19 100644 (file)
@@ -386,8 +386,8 @@ void encode_mb_row(VP8_COMP *cpi,
 #if CONFIG_MULTITHREAD
     const int nsync = cpi->mt_sync_range;
     const int rightmost_col = cm->mb_cols + nsync;
-    volatile const int *last_row_current_mb_col;
-    volatile int *current_mb_col = &cpi->mt_current_mb_col[mb_row];
+    const int *last_row_current_mb_col;
+    int *current_mb_col = &cpi->mt_current_mb_col[mb_row];
 
     if ((cpi->b_multi_threaded != 0) && (mb_row != 0))
         last_row_current_mb_col = &cpi->mt_current_mb_col[mb_row - 1];
@@ -461,17 +461,15 @@ void encode_mb_row(VP8_COMP *cpi,
         vp8_copy_mem16x16(x->src.y_buffer, x->src.y_stride, x->thismb, 16);
 
 #if CONFIG_MULTITHREAD
-        if (cpi->b_multi_threaded != 0)
-        {
-            *current_mb_col = mb_col - 1; /* set previous MB done */
+        if (cpi->b_multi_threaded != 0) {
+            if (((mb_col - 1) % nsync) == 0) {
+                pthread_mutex_t *mutex = &cpi->pmutex[mb_row];
+                protected_write(mutex, current_mb_col, mb_col - 1);
+            }
 
-            if ((mb_col & (nsync - 1)) == 0)
-            {
-                while (mb_col > (*last_row_current_mb_col - nsync))
-                {
-                    x86_pause_hint();
-                    thread_sleep(0);
-                }
+            if (mb_row && !(mb_col & (nsync - 1))) {
+                pthread_mutex_t *mutex = &cpi->pmutex[mb_row-1];
+                sync_read(mutex, mb_col, last_row_current_mb_col, nsync);
             }
         }
 #endif
@@ -616,7 +614,7 @@ void encode_mb_row(VP8_COMP *cpi,
 
 #if CONFIG_MULTITHREAD
     if (cpi->b_multi_threaded != 0)
-        *current_mb_col = rightmost_col;
+        protected_write(&cpi->pmutex[mb_row], current_mb_col, rightmost_col);
 #endif
 
     /* this is to account for the border */
index 4e234ccd58bf2618955d97a334dd4f40cf748191..4f689c4bc7ea37a9e19f8256d3811702bf46eed1 100644 (file)
@@ -26,12 +26,13 @@ static THREAD_FUNCTION thread_loopfilter(void *p_data)
 
     while (1)
     {
-        if (cpi->b_multi_threaded == 0)
+        if (protected_read(&cpi->mt_mutex, &cpi->b_multi_threaded) == 0)
             break;
 
         if (sem_wait(&cpi->h_event_start_lpf) == 0)
         {
-            if (cpi->b_multi_threaded == 0) /* we're shutting down */
+            /* we're shutting down */
+            if (protected_read(&cpi->mt_mutex, &cpi->b_multi_threaded) == 0)
                 break;
 
             vp8_loopfilter_frame(cpi, cm);
@@ -53,7 +54,7 @@ THREAD_FUNCTION thread_encoding_proc(void *p_data)
 
     while (1)
     {
-        if (cpi->b_multi_threaded == 0)
+        if (protected_read(&cpi->mt_mutex, &cpi->b_multi_threaded) == 0)
             break;
 
         if (sem_wait(&cpi->h_event_start_encoding[ithread]) == 0)
@@ -72,9 +73,14 @@ THREAD_FUNCTION thread_encoding_proc(void *p_data)
             int *segment_counts = mbri->segment_counts;
             int *totalrate = &mbri->totalrate;
 
-            if (cpi->b_multi_threaded == 0) /* we're shutting down */
+            /* we're shutting down */
+            if (protected_read(&cpi->mt_mutex, &cpi->b_multi_threaded) == 0)
                 break;
 
+            xd->mode_info_context = cm->mi + cm->mode_info_stride *
+                (ithread + 1);
+            xd->mode_info_stride = cm->mode_info_stride;
+
             for (mb_row = ithread + 1; mb_row < cm->mb_rows; mb_row += (cpi->encoding_thread_count + 1))
             {
 
@@ -85,8 +91,8 @@ THREAD_FUNCTION thread_encoding_proc(void *p_data)
                 int recon_y_stride = cm->yv12_fb[ref_fb_idx].y_stride;
                 int recon_uv_stride = cm->yv12_fb[ref_fb_idx].uv_stride;
                 int map_index = (mb_row * cm->mb_cols);
-                volatile const int *last_row_current_mb_col;
-                volatile int *current_mb_col = &cpi->mt_current_mb_col[mb_row];
+                const int *last_row_current_mb_col;
+                int *current_mb_col = &cpi->mt_current_mb_col[mb_row];
 
 #if  (CONFIG_REALTIME_ONLY & CONFIG_ONTHEFLY_BITPACKING)
                 vp8_writer *w = &cpi->bc[1 + (mb_row % num_part)];
@@ -113,15 +119,14 @@ THREAD_FUNCTION thread_encoding_proc(void *p_data)
                 /* for each macroblock col in image */
                 for (mb_col = 0; mb_col < cm->mb_cols; mb_col++)
                 {
-                    *current_mb_col = mb_col - 1;
+                    if (((mb_col - 1) % nsync) == 0) {
+                        pthread_mutex_t *mutex = &cpi->pmutex[mb_row];
+                        protected_write(mutex, current_mb_col, mb_col - 1);
+                    }
 
-                    if ((mb_col & (nsync - 1)) == 0)
-                    {
-                        while (mb_col > (*last_row_current_mb_col - nsync))
-                        {
-                            x86_pause_hint();
-                            thread_sleep(0);
-                        }
+                    if (mb_row && !(mb_col & (nsync - 1))) {
+                      pthread_mutex_t *mutex = &cpi->pmutex[mb_row-1];
+                      sync_read(mutex, mb_col, last_row_current_mb_col, nsync);
                     }
 
 #if CONFIG_REALTIME_ONLY & CONFIG_ONTHEFLY_BITPACKING
@@ -296,7 +301,8 @@ THREAD_FUNCTION thread_encoding_proc(void *p_data)
                                     xd->dst.u_buffer + 8,
                                     xd->dst.v_buffer + 8);
 
-                *current_mb_col = mb_col + nsync;
+                protected_write(&cpi->pmutex[mb_row], current_mb_col,
+                                mb_col + nsync);
 
                 /* this is to account for the border */
                 xd->mode_info_context++;
@@ -473,9 +479,6 @@ void vp8cx_init_mbrthread_data(VP8_COMP *cpi,
 
         mb->partition_info = x->pi + x->e_mbd.mode_info_stride * (i + 1);
 
-        mbd->mode_info_context = cm->mi   + x->e_mbd.mode_info_stride * (i + 1);
-        mbd->mode_info_stride  = cm->mode_info_stride;
-
         mbd->frame_type = cm->frame_type;
 
         mb->src = * cpi->Source;
@@ -517,6 +520,8 @@ int vp8cx_create_encoder_threads(VP8_COMP *cpi)
     cpi->encoding_thread_count = 0;
     cpi->b_lpf_running = 0;
 
+    pthread_mutex_init(&cpi->mt_mutex, NULL);
+
     if (cm->processor_core_count > 1 && cpi->oxcf.multi_threaded > 1)
     {
         int ithread;
@@ -580,7 +585,7 @@ int vp8cx_create_encoder_threads(VP8_COMP *cpi)
         if(rc)
         {
             /* shutdown other threads */
-            cpi->b_multi_threaded = 0;
+            protected_write(&cpi->mt_mutex, &cpi->b_multi_threaded, 0);
             for(--ithread; ithread >= 0; ithread--)
             {
                 pthread_join(cpi->h_encoding_thread[ithread], 0);
@@ -594,6 +599,8 @@ int vp8cx_create_encoder_threads(VP8_COMP *cpi)
             vpx_free(cpi->mb_row_ei);
             vpx_free(cpi->en_thread_data);
 
+            pthread_mutex_destroy(&cpi->mt_mutex);
+
             return -1;
         }
 
@@ -611,7 +618,7 @@ int vp8cx_create_encoder_threads(VP8_COMP *cpi)
             if(rc)
             {
                 /* shutdown other threads */
-                cpi->b_multi_threaded = 0;
+                protected_write(&cpi->mt_mutex, &cpi->b_multi_threaded, 0);
                 for(--ithread; ithread >= 0; ithread--)
                 {
                     sem_post(&cpi->h_event_start_encoding[ithread]);
@@ -628,6 +635,8 @@ int vp8cx_create_encoder_threads(VP8_COMP *cpi)
                 vpx_free(cpi->mb_row_ei);
                 vpx_free(cpi->en_thread_data);
 
+                pthread_mutex_destroy(&cpi->mt_mutex);
+
                 return -2;
             }
         }
@@ -637,10 +646,10 @@ int vp8cx_create_encoder_threads(VP8_COMP *cpi)
 
 void vp8cx_remove_encoder_threads(VP8_COMP *cpi)
 {
-    if (cpi->b_multi_threaded)
+    if (protected_read(&cpi->mt_mutex, &cpi->b_multi_threaded))
     {
         /* shutdown other threads */
-        cpi->b_multi_threaded = 0;
+        protected_write(&cpi->mt_mutex, &cpi->b_multi_threaded, 0);
         {
             int i;
 
@@ -666,5 +675,6 @@ void vp8cx_remove_encoder_threads(VP8_COMP *cpi)
         vpx_free(cpi->mb_row_ei);
         vpx_free(cpi->en_thread_data);
     }
+    pthread_mutex_destroy(&cpi->mt_mutex);
 }
 #endif
index df5bcf68836480180cf60fe9a020f0466c5458c9..5a4b37dcff176a5a503428f3966712e0f8ae7cf2 100644 (file)
@@ -477,6 +477,18 @@ static void dealloc_compressor_data(VP8_COMP *cpi)
     cpi->mb.pip = 0;
 
 #if CONFIG_MULTITHREAD
+    /* De-allocate mutex */
+    if (cpi->pmutex != NULL) {
+        VP8_COMMON *const pc = &cpi->common;
+        int i;
+
+        for (i = 0; i < pc->mb_rows; i++) {
+            pthread_mutex_destroy(&cpi->pmutex[i]);
+        }
+        vpx_free(cpi->pmutex);
+        cpi->pmutex = NULL;
+    }
+
     vpx_free(cpi->mt_current_mb_col);
     cpi->mt_current_mb_col = NULL;
 #endif
@@ -1180,6 +1192,9 @@ void vp8_alloc_compressor_data(VP8_COMP *cpi)
 
     int width = cm->Width;
     int height = cm->Height;
+#if CONFIG_MULTITHREAD
+    int prev_mb_rows = cm->mb_rows;
+#endif
 
     if (vp8_alloc_frame_buffers(cm, width, height))
         vpx_internal_error(&cpi->common.error, VPX_CODEC_MEM_ERROR,
@@ -1271,6 +1286,25 @@ void vp8_alloc_compressor_data(VP8_COMP *cpi)
 
     if (cpi->oxcf.multi_threaded > 1)
     {
+        int i;
+
+        /* De-allocate and re-allocate mutex */
+        if (cpi->pmutex != NULL) {
+            for (i = 0; i < prev_mb_rows; i++) {
+                pthread_mutex_destroy(&cpi->pmutex[i]);
+            }
+            vpx_free(cpi->pmutex);
+            cpi->pmutex = NULL;
+        }
+
+        CHECK_MEM_ERROR(cpi->pmutex, vpx_malloc(sizeof(*cpi->pmutex) *
+                                                cm->mb_rows));
+        if (cpi->pmutex) {
+            for (i = 0; i < cm->mb_rows; i++) {
+                pthread_mutex_init(&cpi->pmutex[i], NULL);
+            }
+        }
+
         vpx_free(cpi->mt_current_mb_col);
         CHECK_MEM_ERROR(cpi->mt_current_mb_col,
                     vpx_malloc(sizeof(*cpi->mt_current_mb_col) * cm->mb_rows));
index 317e4b9e476aabf8ebdd3eac8843bf46de0b43c7..2b2f7a0a9ae77afd9ea9d92aad0165c0a54c0885 100644 (file)
@@ -530,6 +530,8 @@ typedef struct VP8_COMP
 
 #if CONFIG_MULTITHREAD
     /* multithread data */
+    pthread_mutex_t *pmutex;
+    pthread_mutex_t mt_mutex;           /* mutex for b_multi_threaded */
     int * mt_current_mb_col;
     int mt_sync_range;
     int b_multi_threaded;