]> granicus.if.org Git - handbrake/commitdiff
filter: add frame parallelizing filter wrapper
authorJohn Stebbins <jstebbins.hb@gmail.com>
Wed, 31 May 2017 17:21:53 +0000 (10:21 -0700)
committerBradley Sepos <bradley@bradleysepos.com>
Tue, 6 Jun 2017 15:42:09 +0000 (11:42 -0400)
This wrapper can be used to frame parallelize simple video filters. By
simple, I mean there can be no temporal context that is shared from one
frame to the next.

Wrap unsharp and lapsharp filters.  unsharp required a small rework to
separate out temporary storage that is required when processing each
frame.  We now need to duplicate this storage for each thread.

Closes #759.

libhb/common.c
libhb/common.h
libhb/hb.c
libhb/internal.h
libhb/mt_frame_filter.c [new file with mode: 0644]
libhb/unsharp.c

index 71a8d0e266dfcd48d65fc76d15692505bc24a55a..ebe51c6e504ac20728ac5dfa5572311eb643ac4d 100644 (file)
@@ -3861,6 +3861,7 @@ hb_filter_object_t * hb_filter_copy( hb_filter_object_t * filter )
     memcpy( filter_copy, filter, sizeof( hb_filter_object_t ) );
     if( filter->settings )
         filter_copy->settings = hb_value_dup(filter->settings);
+    filter_copy->sub_filter = hb_filter_copy(filter->sub_filter);
     return filter_copy;
 }
 
@@ -3997,6 +3998,10 @@ hb_filter_object_t * hb_filter_get( int filter_id )
             break;
 #endif
 
+        case HB_FILTER_MT_FRAME:
+            filter = &hb_filter_mt_frame;
+            break;
+
         default:
             filter = NULL;
             break;
@@ -4006,7 +4011,23 @@ hb_filter_object_t * hb_filter_get( int filter_id )
 
 hb_filter_object_t * hb_filter_init( int filter_id )
 {
-    return hb_filter_copy(hb_filter_get(filter_id));
+    switch (filter_id)
+    {
+        case HB_FILTER_UNSHARP:
+        case HB_FILTER_LAPSHARP:
+        {
+            hb_filter_object_t * wrapper;
+
+            wrapper = hb_filter_copy(hb_filter_get(HB_FILTER_MT_FRAME));
+            wrapper->sub_filter = hb_filter_copy(hb_filter_get(filter_id));
+            wrapper->id = filter_id;
+            wrapper->name = wrapper->sub_filter->name;
+            return wrapper;
+        } break;
+
+        default:
+            return hb_filter_copy(hb_filter_get(filter_id));
+    }
 }
 
 /**********************************************************************
@@ -4018,6 +4039,11 @@ void hb_filter_close( hb_filter_object_t ** _f )
 {
     hb_filter_object_t * f = *_f;
 
+    if (f == NULL)
+    {
+        return;
+    }
+    hb_filter_close(&f->sub_filter);
     hb_value_free(&f->settings);
 
     free( f );
index a251365ac5740b38f4e49d823d221c1f8ab9d596..2edbdc697e9c8d409de85c3a4ee1b87ed8c53f62 100644 (file)
@@ -1222,12 +1222,15 @@ struct hb_filter_object_s
     hb_dict_t           * settings;
 
 #ifdef __LIBHB__
-    int                (* init)     ( hb_filter_object_t *, hb_filter_init_t * );
-    int                (* post_init)( hb_filter_object_t *, hb_job_t * );
-    int                (* work)     ( hb_filter_object_t *,
-                                      hb_buffer_t **, hb_buffer_t ** );
-    void               (* close)    ( hb_filter_object_t * );
-    hb_filter_info_t * (* info)     ( hb_filter_object_t * );
+    int                (* init)       ( hb_filter_object_t *, hb_filter_init_t * );
+    int                (* init_thread)( hb_filter_object_t *, int );
+    int                (* post_init)  ( hb_filter_object_t *, hb_job_t * );
+    int                (* work)       ( hb_filter_object_t *,
+                                        hb_buffer_t **, hb_buffer_t ** );
+    int                (* work_thread)( hb_filter_object_t *,
+                                        hb_buffer_t **, hb_buffer_t **, int );
+    void               (* close)      ( hb_filter_object_t * );
+    hb_filter_info_t * (* info)       ( hb_filter_object_t * );
 
     const char          * settings_template;
 
@@ -1246,6 +1249,8 @@ struct hb_filter_object_s
     // These are used to bridge the chapter to the next buffer
     int                   chapter_val;
     int64_t               chapter_time;
+
+    hb_filter_object_t  * sub_filter;
 #endif
 };
 
@@ -1284,7 +1289,9 @@ enum
     HB_FILTER_QSV_POST,
     // default MSDK VPP filter
     HB_FILTER_QSV,
-    HB_FILTER_LAST = HB_FILTER_QSV
+    HB_FILTER_LAST = HB_FILTER_QSV,
+    // wrapper filter for frame based multi-threading of simple filters
+    HB_FILTER_MT_FRAME
 };
 
 hb_filter_object_t * hb_filter_get( int filter_id );
index f8bba2a66bab644653673010e5b979fd95282f6d..a680df91fb7300df1e9a9b337075ff7e6375e070 100644 (file)
@@ -1441,6 +1441,10 @@ void hb_add_filter_dict( hb_job_t * job, hb_filter_object_t * filter,
         settings = hb_value_dup(settings_in);
     }
     filter->settings = settings;
+    if (filter->sub_filter)
+    {
+        filter->sub_filter->settings = hb_value_dup(settings);
+    }
     if( filter->enforce_order )
     {
         // Find the position in the filter chain this filter belongs in
index 4ffd6422a47afcdebde1544f1360078b416b750e..5d462f37f9db9b23ed77389bd973204602ae52d8 100644 (file)
@@ -468,6 +468,7 @@ extern hb_filter_object_t hb_filter_pad;
 extern hb_filter_object_t hb_filter_lapsharp;
 extern hb_filter_object_t hb_filter_unsharp;
 extern hb_filter_object_t hb_filter_avfilter;
+extern hb_filter_object_t hb_filter_mt_frame;
 
 #ifdef USE_QSV
 extern hb_filter_object_t hb_filter_qsv;
diff --git a/libhb/mt_frame_filter.c b/libhb/mt_frame_filter.c
new file mode 100644 (file)
index 0000000..7ae009c
--- /dev/null
@@ -0,0 +1,236 @@
+/* mt_frame_filter.c
+
+   Copyright (c) 2003-2017 HandBrake Team
+   This file is part of the HandBrake source code
+   Homepage: <http://handbrake.fr/>.
+   It may be used under the terms of the GNU General Public License v2.
+   For full terms see the file COPYING file or visit http://www.gnu.org/licenses/gpl-2.0.html
+ */
+
+/* This is a psuedo-filter that wraps other filters to provide frame
+ * based multi-threading of the wrapped filter. The sub-filter must
+ * operate on each frame independently with no context carried over
+ * from one frame to the next. */
+
+#include "hb.h"
+#include "taskset.h"
+
+typedef struct
+{
+    hb_filter_private_t *pv;
+    int segment;
+    hb_buffer_t *out;
+} mt_frame_thread_arg_t;
+
+struct hb_filter_private_s
+{
+    hb_filter_object_t     * sub_filter;
+    hb_buffer_t           ** buf;
+    int                      frame_count;
+    taskset_t                taskset;
+    int                      thread_count;
+    mt_frame_thread_arg_t ** thread_data;
+};
+
+static int mt_frame_init(hb_filter_object_t *filter, hb_filter_init_t *init);
+static int mt_frame_work(hb_filter_object_t *filter,
+                         hb_buffer_t **buf_in,
+                         hb_buffer_t **buf_out);
+static void mt_frame_close(hb_filter_object_t *filter);
+
+static void mt_frame_filter_thread(void *thread_args_v);
+
+static const char mt_frame_template[] = "";
+
+hb_filter_object_t hb_filter_mt_frame =
+{
+    .id                = HB_FILTER_MT_FRAME,
+    .enforce_order     = 0,
+    .name              = "MTFrame (mtframe)",
+    .settings          = NULL,
+    .init              = mt_frame_init,
+    .work              = mt_frame_work,
+    .close             = mt_frame_close,
+    .settings_template = mt_frame_template,
+};
+
+static int mt_frame_init(hb_filter_object_t * filter,
+                         hb_filter_init_t   * init)
+{
+    filter->private_data = calloc(sizeof(struct hb_filter_private_s), 1);
+    hb_filter_private_t *pv = filter->private_data;
+
+    pv->sub_filter = filter->sub_filter;
+    pv->sub_filter->init(pv->sub_filter, init);
+
+    pv->thread_count = hb_get_cpu_count();
+    pv->buf = calloc(pv->thread_count, sizeof(hb_buffer_t*));
+
+    pv->thread_data = malloc(pv->thread_count * sizeof(mt_frame_thread_arg_t*));
+    if (taskset_init(&pv->taskset, pv->thread_count,
+                     sizeof(mt_frame_thread_arg_t)) == 0)
+    {
+        hb_error("MTFrame could not initialize taskset");
+        goto fail;
+    }
+
+    for (int ii = 0; ii < pv->thread_count; ii++)
+    {
+        pv->thread_data[ii] = taskset_thread_args(&pv->taskset, ii);
+        if (pv->thread_data[ii] == NULL)
+        {
+            hb_error("MTFrame could not create thread args");
+            goto fail;
+        }
+        pv->thread_data[ii]->pv = pv;
+        pv->thread_data[ii]->segment = ii;
+        if (taskset_thread_spawn(&pv->taskset, ii, "mt_frame_filter",
+                             mt_frame_filter_thread, HB_NORMAL_PRIORITY) == 0)
+        {
+            hb_error("MTFrame could not spawn thread");
+            goto fail;
+        }
+    }
+
+    if (pv->sub_filter->init_thread != NULL)
+    {
+        if (pv->sub_filter->init_thread(pv->sub_filter, pv->thread_count) < 0)
+        {
+            goto fail;
+        }
+    }
+
+    return 0;
+
+fail:
+    taskset_fini(&pv->taskset);
+    free(pv->thread_data);
+    free(pv);
+    return -1;
+}
+
+static void mt_frame_close(hb_filter_object_t *filter)
+{
+    hb_filter_private_t *pv = filter->private_data;
+
+    if (pv == NULL)
+    {
+        return;
+    }
+
+    pv->sub_filter->close(pv->sub_filter);
+    taskset_fini(&pv->taskset);
+    free(pv->thread_data);
+    free(pv->buf);
+    free(pv);
+    filter->private_data = NULL;
+}
+
+static void mt_frame_filter_thread(void *thread_args_v)
+{
+    mt_frame_thread_arg_t *thread_data = thread_args_v;
+    hb_filter_private_t *pv = thread_data->pv;
+    int segment = thread_data->segment;
+
+    hb_log("MTFrame thread started for segment %d", segment);
+
+    while (1)
+    {
+        // Wait until there is work to do.
+        taskset_thread_wait4start(&pv->taskset, segment);
+
+        if (taskset_thread_stop(&pv->taskset, segment))
+        {
+            break;
+        }
+
+        if (pv->sub_filter->work_thread != NULL)
+        {
+            pv->sub_filter->work_thread(pv->sub_filter,
+                                 &pv->buf[segment], &thread_data->out, segment);
+        }
+        else
+        {
+            pv->sub_filter->work(pv->sub_filter,
+                                 &pv->buf[segment], &thread_data->out);
+        }
+        if (pv->buf[segment] != NULL)
+        {
+            hb_buffer_close(&pv->buf[segment]);
+        }
+
+        // Finished this segment, notify.
+        taskset_thread_complete(&pv->taskset, segment);
+    }
+    taskset_thread_complete(&pv->taskset, segment);
+}
+
+static hb_buffer_t * mt_frame_filter(hb_filter_private_t *pv)
+{
+    if (pv->frame_count < pv->thread_count)
+    {
+        return NULL;
+    }
+
+    taskset_cycle(&pv->taskset);
+    pv->frame_count = 0;
+
+    // Collect results from taskset
+    hb_buffer_list_t list;
+    hb_buffer_list_clear(&list);
+    for (int t = 0; t < pv->thread_count; t++)
+    {
+        hb_buffer_list_append(&list, pv->thread_data[t]->out);
+    }
+    return hb_buffer_list_clear(&list);
+}
+
+static hb_buffer_t * mt_frame_filter_flush(hb_filter_private_t *pv)
+{
+    hb_buffer_list_t list;
+
+    hb_buffer_list_clear(&list);
+    for (int f = 0; f < pv->frame_count; f++)
+    {
+        hb_buffer_t * out;
+        pv->sub_filter->work(pv->sub_filter, &pv->buf[f], &out);
+
+        if (pv->buf[f] != NULL)
+        {
+            hb_buffer_close(&pv->buf[f]);
+        }
+        hb_buffer_list_append(&list, out);
+    }
+    pv->frame_count = 0;
+    return hb_buffer_list_clear(&list);
+}
+
+static int mt_frame_work(hb_filter_object_t  * filter,
+                         hb_buffer_t        ** buf_in,
+                         hb_buffer_t        ** buf_out )
+{
+    hb_filter_private_t *pv = filter->private_data;
+    hb_buffer_t *in = *buf_in;
+
+    *buf_in  = NULL;
+    if (in->s.flags & HB_BUF_FLAG_EOF)
+    {
+        hb_buffer_list_t list;
+        hb_buffer_t *buf;
+
+        // Flush buffered frames
+        buf = mt_frame_filter_flush(pv);
+        hb_buffer_list_set(&list, buf);
+
+        // And terminate the buffer list with a EOF buffer
+        hb_buffer_list_append(&list, in);
+        *buf_out = hb_buffer_list_clear(&list);
+
+        return HB_FILTER_DONE;
+    }
+
+    pv->buf[pv->frame_count++] = in;
+    *buf_out = mt_frame_filter(pv);
+
+    return HB_FILTER_OK;
+}
index 2d5a1d5773c28215e685098bb8d07203c7816f02..8f1c99e6bb88c29ddf4eabb4f0b2672cb09cf5f8 100644 (file)
@@ -19,6 +19,8 @@
 
 typedef struct
 {
+    int        pix_fmt;   // source pixel format
+    int        width;     // source video width
     double     strength;  // strength
     int        size;      // pixel context region width (must be odd)
 
@@ -26,24 +28,37 @@ typedef struct
     int        amount;
     int        scalebits;
     int32_t    halfscale;
-    uint32_t * SC[UNSHARP_SIZE_MAX - 1];
 } unsharp_plane_context_t;
 
+typedef struct
+{
+    uint32_t * SC[UNSHARP_SIZE_MAX - 1];
+} unsharp_thread_context_t;
+
+typedef unsharp_thread_context_t unsharp_thread_context3_t[3];
+
 struct hb_filter_private_s
 {
-    unsharp_plane_context_t  plane_ctx[3];
+    unsharp_plane_context_t     plane_ctx[3];
+    unsharp_thread_context3_t * thread_ctx;
+    int                         threads;
 };
 
-static int hb_unsharp_init(hb_filter_object_t *filter,
-                           hb_filter_init_t   *init);
+static int unsharp_init(hb_filter_object_t *filter,
+                        hb_filter_init_t   *init);
+
+static int unsharp_init_thread(hb_filter_object_t *filter, int threads);
 
-static int hb_unsharp_work(hb_filter_object_t *filter,
-                           hb_buffer_t ** buf_in,
-                           hb_buffer_t ** buf_out);
+static int unsharp_work(hb_filter_object_t *filter,
+                        hb_buffer_t ** buf_in,
+                        hb_buffer_t ** buf_out);
+static int unsharp_work_thread(hb_filter_object_t *filter,
+                               hb_buffer_t ** buf_in,
+                               hb_buffer_t ** buf_out, int thread);
 
-static void hb_unsharp_close(hb_filter_object_t *filter);
+static void unsharp_close(hb_filter_object_t *filter);
 
-static const char hb_unsharp_template[] =
+static const char unsharp_template[] =
     "y-strength=^"HB_FLOAT_REG"$:y-size=^"HB_INT_REG"$:"
     "cb-strength=^"HB_FLOAT_REG"$:cb-size=^"HB_INT_REG"$:"
     "cr-strength=^"HB_FLOAT_REG"$:cr-size=^"HB_INT_REG"$";
@@ -54,20 +69,23 @@ hb_filter_object_t hb_filter_unsharp =
     .enforce_order     = 1,
     .name              = "Sharpen (unsharp)",
     .settings          = NULL,
-    .init              = hb_unsharp_init,
-    .work              = hb_unsharp_work,
-    .close             = hb_unsharp_close,
-    .settings_template = hb_unsharp_template,
+    .init              = unsharp_init,
+    .init_thread       = unsharp_init_thread,
+    .work              = unsharp_work,
+    .work_thread       = unsharp_work_thread,
+    .close             = unsharp_close,
+    .settings_template = unsharp_template,
 };
 
-static void hb_unsharp(const uint8_t *src,
-                             uint8_t *dst,
-                       const int width,
-                       const int height,
-                       const int stride,
-                       unsharp_plane_context_t * ctx)
+static void unsharp(const uint8_t *src,
+                          uint8_t *dst,
+                    const int width,
+                    const int height,
+                    const int stride,
+                    unsharp_plane_context_t * ctx,
+                    unsharp_thread_context_t * tctx)
 {
-    uint32_t **SC = ctx->SC;
+    uint32_t **SC = tctx->SC;
     uint32_t SR[UNSHARP_SIZE_MAX - 1],
              Tmp1,
              Tmp2;
@@ -138,10 +156,15 @@ static void hb_unsharp(const uint8_t *src,
     }
 }
 
-static int hb_unsharp_init(hb_filter_object_t *filter,
-                           hb_filter_init_t   *init)
+static int unsharp_init(hb_filter_object_t *filter,
+                        hb_filter_init_t   *init)
 {
     filter->private_data = calloc(sizeof(struct hb_filter_private_s), 1);
+    if (filter->private_data == NULL)
+    {
+        hb_error("Unsharp calloc failed");
+        return -1;
+    }
     hb_filter_private_t * pv = filter->private_data;
 
     // Mark parameters unset
@@ -179,7 +202,8 @@ static int hb_unsharp_init(hb_filter_object_t *filter,
     for (int c = 0; c < 3; c++)
     {
         unsharp_plane_context_t * ctx = &pv->plane_ctx[c];
-        int w = hb_image_width(init->pix_fmt, init->geometry.width, c);
+
+        ctx->width = init->geometry.width;
 
         // Replace unset values with defaults
         if (ctx->strength == -1)
@@ -204,44 +228,85 @@ static int hb_unsharp_init(hb_filter_object_t *filter,
         ctx->steps     = ctx->size / 2;
         ctx->scalebits = ctx->steps * 4;
         ctx->halfscale = 1 << (ctx->scalebits - 1);
+    }
 
-        int z;
-        for (z = 0; z < 2 * ctx->steps; z++)
-        {
-            ctx->SC[z] = malloc(sizeof(*(ctx->SC[z])) * (w + 2 * ctx->steps));
-        }
+    if (unsharp_init_thread(filter, 1) < 0)
+    {
+        unsharp_close(filter);
+        return -1;
     }
 
     return 0;
 }
 
-static void hb_unsharp_close(hb_filter_object_t * filter)
+static void unsharp_thread_close(hb_filter_private_t *pv)
 {
-    hb_filter_private_t *pv = filter->private_data;
-
-    if (pv == NULL)
+    int c, z;
+    for (c = 0; c < 3; c++)
     {
-        return;
+        unsharp_plane_context_t * ctx = &pv->plane_ctx[c];
+        for (int t = 0; t < pv->threads; t++)
+        {
+            unsharp_thread_context_t * tctx = &pv->thread_ctx[t][c];
+            for (z = 0; z < 2 * ctx->steps; z++)
+            {
+                free(tctx->SC[z]);
+                tctx->SC[z] = NULL;
+            }
+        }
     }
+    free(pv->thread_ctx);
+}
 
-    int c, z;
-    for (c = 0; c < 3; c++)
+static int unsharp_init_thread(hb_filter_object_t *filter, int threads)
+{
+    hb_filter_private_t * pv = filter->private_data;
+
+    unsharp_thread_close(pv);
+    pv->thread_ctx = calloc(threads, sizeof(unsharp_thread_context3_t));
+    pv->threads = threads;
+    for (int c = 0; c < 3; c++)
     {
         unsharp_plane_context_t * ctx = &pv->plane_ctx[c];
-        for (z = 0; z < ctx->steps; z++)
+        int w = hb_image_width(ctx->pix_fmt, ctx->width, c);
+
+        for (int t = 0; t < threads; t++)
         {
-            free(ctx->SC[z]);
-            ctx->SC[z] = NULL;
+            unsharp_thread_context_t * tctx = &pv->thread_ctx[t][c];
+            int z;
+            for (z = 0; z < 2 * ctx->steps; z++)
+            {
+                tctx->SC[z] = malloc(sizeof(*(tctx->SC[z])) *
+                                     (w + 2 * ctx->steps));
+                if (tctx->SC[z] == NULL)
+                {
+                    hb_error("Unsharp calloc failed");
+                    unsharp_close(filter);
+                    return -1;
+                }
+            }
         }
     }
+    return 0;
+}
+
+static void unsharp_close(hb_filter_object_t * filter)
+{
+    hb_filter_private_t *pv = filter->private_data;
+
+    if (pv == NULL)
+    {
+        return;
+    }
 
+    unsharp_thread_close(pv);
     free(pv);
     filter->private_data = NULL;
 }
 
-static int hb_unsharp_work(hb_filter_object_t *filter,
-                           hb_buffer_t ** buf_in,
-                           hb_buffer_t ** buf_out)
+static int unsharp_work_thread(hb_filter_object_t *filter,
+                               hb_buffer_t ** buf_in,
+                               hb_buffer_t ** buf_out, int thread)
 {
     hb_filter_private_t *pv = filter->private_data;
     hb_buffer_t *in = *buf_in, *out;
@@ -258,13 +323,14 @@ static int hb_unsharp_work(hb_filter_object_t *filter,
     int c;
     for (c = 0; c < 3; c++)
     {
-        unsharp_plane_context_t * ctx = &pv->plane_ctx[c];
-        hb_unsharp(in->plane[c].data,
-                   out->plane[c].data,
-                   in->plane[c].width,
-                   in->plane[c].height,
-                   in->plane[c].stride,
-                   ctx);
+        unsharp_plane_context_t  * ctx  = &pv->plane_ctx[c];
+        unsharp_thread_context_t * tctx = &pv->thread_ctx[thread][c];
+        unsharp(in->plane[c].data,
+                out->plane[c].data,
+                in->plane[c].width,
+                in->plane[c].height,
+                in->plane[c].stride,
+                ctx, tctx);
     }
 
     out->s = in->s;
@@ -272,3 +338,10 @@ static int hb_unsharp_work(hb_filter_object_t *filter,
 
     return HB_FILTER_OK;
 }
+
+static int unsharp_work(hb_filter_object_t *filter,
+                        hb_buffer_t ** buf_in,
+                        hb_buffer_t ** buf_out)
+{
+    return unsharp_work_thread(filter, buf_in, buf_out, 0);
+}