]> granicus.if.org Git - libvpx/commitdiff
Add Anandan model
authorDan Zhu <zxdan@google.com>
Fri, 23 Aug 2019 01:20:38 +0000 (18:20 -0700)
committerDan Zhu <zxdan@google.com>
Fri, 23 Aug 2019 01:20:38 +0000 (18:20 -0700)
Change-Id: Ic3450125c83b41e7e4a953093b4d8177f04d220a

tools/3D-Reconstruction/MotionEST/Anandan.py [new file with mode: 0644]

diff --git a/tools/3D-Reconstruction/MotionEST/Anandan.py b/tools/3D-Reconstruction/MotionEST/Anandan.py
new file mode 100644 (file)
index 0000000..b8a19a3
--- /dev/null
@@ -0,0 +1,185 @@
+#!/usr/bin/env python
+# coding: utf-8
+import numpy as np
+import numpy.linalg as LA
+from scipy.ndimage.filters import gaussian_filter
+from scipy.sparse import csc_matrix
+from scipy.sparse.linalg import inv
+from MotionEST import MotionEST
+"""Anandan Model"""
+
+
+class Anandan(MotionEST):
+  """
+    constructor:
+        cur_f: current frame
+        ref_f: reference frame
+        blk_sz: block size
+        beta: smooth constrain weight
+        k1,k2,k3: confidence coefficients
+        max_iter: maximum number of iterations
+    """
+
+  def __init__(self, cur_f, ref_f, blk_sz, beta, k1, k2, k3, max_iter=100):
+    super(Anandan, self).__init__(cur_f, ref_f, blk_sz)
+    self.levels = int(np.log2(blk_sz))
+    self.intensity_hierarchy()
+    self.c_maxs = []
+    self.c_mins = []
+    self.e_maxs = []
+    self.e_mins = []
+    for l in xrange(self.levels + 1):
+      c_max, c_min, e_max, e_min = self.get_curvature(self.cur_Is[l])
+      self.c_maxs.append(c_max)
+      self.c_mins.append(c_min)
+      self.e_maxs.append(e_max)
+      self.e_mins.append(e_min)
+    self.beta = beta
+    self.k1, self.k2, self.k3 = k1, k2, k3
+    self.max_iter = max_iter
+
+  """
+    build intensity hierarchy
+    """
+
+  def intensity_hierarchy(self):
+    level = 0
+    self.cur_Is = []
+    self.ref_Is = []
+    #build each level itensity by using gaussian filters
+    while level <= self.levels:
+      cur_I = gaussian_filter(self.cur_yuv[:, :, 0], sigma=(2**level) * 0.56)
+      ref_I = gaussian_filter(self.ref_yuv[:, :, 0], sigma=(2**level) * 0.56)
+      self.ref_Is.append(ref_I)
+      self.cur_Is.append(cur_I)
+      level += 1
+
+  """
+    get curvature of each block
+    """
+
+  def get_curvature(self, I):
+    c_max = np.zeros((self.num_row, self.num_col))
+    c_min = np.zeros((self.num_row, self.num_col))
+    e_max = np.zeros((self.num_row, self.num_col, 2))
+    e_min = np.zeros((self.num_row, self.num_col, 2))
+    for r in xrange(self.num_row):
+      for c in xrange(self.num_col):
+        h11, h12, h21, h22 = 0, 0, 0, 0
+        for i in xrange(r * self.blk_sz, r * self.blk_sz + self.blk_sz):
+          for j in xrange(c * self.blk_sz, c * self.blk_sz + self.blk_sz):
+            if 0 <= i < self.height - 1 and 0 <= j < self.width - 1:
+              Ix = I[i][j + 1] - I[i][j]
+              Iy = I[i + 1][j] - I[i][j]
+              h11 += Iy * Iy
+              h12 += Ix * Iy
+              h21 += Ix * Iy
+              h22 += Ix * Ix
+        U, S, _ = LA.svd(np.array([[h11, h12], [h21, h22]]))
+        c_max[r, c], c_min[r, c] = S[0], S[1]
+        e_max[r, c] = U[:, 0]
+        e_min[r, c] = U[:, 1]
+    return c_max, c_min, e_max, e_min
+
+  """
+    get ssd of motion vector:
+      cur_I: current intensity
+      ref_I: reference intensity
+      center: current position
+      mv: motion vector
+    """
+
+  def get_ssd(self, cur_I, ref_I, center, mv):
+    ssd = 0
+    for r in xrange(int(center[0]), int(center[0]) + self.blk_sz):
+      for c in xrange(int(center[1]), int(center[1]) + self.blk_sz):
+        if 0 <= r < self.height and 0 <= c < self.width:
+          tr, tc = r + int(mv[0]), c + int(mv[1])
+          if 0 <= tr < self.height and 0 <= tc < self.width:
+            ssd += (ref_I[tr, tc] - cur_I[r, c])**2
+          else:
+            ssd += cur_I[r, c]**2
+    return ssd
+
+  """
+    get region match of level l
+      l: current level
+      last_mvs: matchine results of last level
+      radius: movenment radius
+    """
+
+  def region_match(self, l, last_mvs, radius):
+    mvs = np.zeros((self.num_row, self.num_col, 2))
+    min_ssds = np.zeros((self.num_row, self.num_col))
+    for r in xrange(self.num_row):
+      for c in xrange(self.num_col):
+        center = np.array([r * self.blk_sz, c * self.blk_sz])
+        #use overlap hierarchy policy
+        init_mvs = []
+        if last_mvs is None:
+          init_mvs = [np.array([0, 0])]
+        else:
+          for i, j in {(r, c), (r, c + 1), (r + 1, c), (r + 1, c + 1)}:
+            if 0 <= i < last_mvs.shape[0] and 0 <= j < last_mvs.shape[1]:
+              init_mvs.append(last_mvs[i, j])
+        #use last matching results as the start postion as current level
+        min_ssd = None
+        min_mv = None
+        for init_mv in init_mvs:
+          for i in xrange(-2, 3):
+            for j in xrange(-2, 3):
+              mv = init_mv + np.array([i, j]) * radius
+              ssd = self.get_ssd(self.cur_Is[l], self.ref_Is[l], center, mv)
+              if min_ssd is None or ssd < min_ssd:
+                min_ssd = ssd
+                min_mv = mv
+        min_ssds[r, c] = min_ssd
+        mvs[r, c] = min_mv
+    return mvs, min_ssds
+
+  """
+    smooth motion field based on neighbor constraint
+      uvs: current estimation
+      mvs: matching results
+      min_ssds: minimum ssd of matching results
+      l: current level
+    """
+
+  def smooth(self, uvs, mvs, min_ssds, l):
+    sm_uvs = np.zeros((self.num_row, self.num_col, 2))
+    c_max = self.c_maxs[l]
+    c_min = self.c_mins[l]
+    e_max = self.e_maxs[l]
+    e_min = self.e_mins[l]
+    for r in xrange(self.num_row):
+      for c in xrange(self.num_col):
+        w_max = c_max[r, c] / (
+            self.k1 + self.k2 * min_ssds[r, c] + self.k3 * c_max[r, c])
+        w_min = c_min[r, c] / (
+            self.k1 + self.k2 * min_ssds[r, c] + self.k3 * c_min[r, c])
+        w = w_max * w_min / (w_max + w_min + 1e-6)
+        if w < 0:
+          w = 0
+        avg_uv = np.array([0.0, 0.0])
+        for i, j in {(r - 1, c), (r + 1, c), (r, c - 1), (r, c + 1)}:
+          if 0 <= i < self.num_row and 0 <= j < self.num_col:
+            avg_uv += 0.25 * uvs[i, j]
+        sm_uvs[r, c] = (w * w * mvs[r, c] + self.beta * avg_uv) / (
+            self.beta + w * w)
+    return sm_uvs
+
+  """
+    motion field estimation
+    """
+
+  def motion_field_estimation(self):
+    last_mvs = None
+    for l in xrange(self.levels, -1, -1):
+      mvs, min_ssds = self.region_match(l, last_mvs, 2**l)
+      uvs = np.zeros(mvs.shape)
+      for _ in xrange(self.max_iter):
+        uvs = self.smooth(uvs, mvs, min_ssds, l)
+      last_mvs = uvs
+    for r in xrange(self.num_row):
+      for c in xrange(self.num_col):
+        self.mf[r, c] = uvs[r, c]