From: Dan Zhu Date: Fri, 23 Aug 2019 01:20:38 +0000 (-0700) Subject: Add Anandan model X-Git-Tag: v1.8.2~126^2 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=7f73dee0e5e0e6bdd2fe1f9718541c7cee455be7;p=libvpx Add Anandan model Change-Id: Ic3450125c83b41e7e4a953093b4d8177f04d220a --- diff --git a/tools/3D-Reconstruction/MotionEST/Anandan.py b/tools/3D-Reconstruction/MotionEST/Anandan.py new file mode 100644 index 000000000..b8a19a38d --- /dev/null +++ b/tools/3D-Reconstruction/MotionEST/Anandan.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python +# coding: utf-8 +import numpy as np +import numpy.linalg as LA +from scipy.ndimage.filters import gaussian_filter +from scipy.sparse import csc_matrix +from scipy.sparse.linalg import inv +from MotionEST import MotionEST +"""Anandan Model""" + + +class Anandan(MotionEST): + """ + constructor: + cur_f: current frame + ref_f: reference frame + blk_sz: block size + beta: smooth constrain weight + k1,k2,k3: confidence coefficients + max_iter: maximum number of iterations + """ + + def __init__(self, cur_f, ref_f, blk_sz, beta, k1, k2, k3, max_iter=100): + super(Anandan, self).__init__(cur_f, ref_f, blk_sz) + self.levels = int(np.log2(blk_sz)) + self.intensity_hierarchy() + self.c_maxs = [] + self.c_mins = [] + self.e_maxs = [] + self.e_mins = [] + for l in xrange(self.levels + 1): + c_max, c_min, e_max, e_min = self.get_curvature(self.cur_Is[l]) + self.c_maxs.append(c_max) + self.c_mins.append(c_min) + self.e_maxs.append(e_max) + self.e_mins.append(e_min) + self.beta = beta + self.k1, self.k2, self.k3 = k1, k2, k3 + self.max_iter = max_iter + + """ + build intensity hierarchy + """ + + def intensity_hierarchy(self): + level = 0 + self.cur_Is = [] + self.ref_Is = [] + #build each level itensity by using gaussian filters + while level <= self.levels: + cur_I = gaussian_filter(self.cur_yuv[:, :, 0], sigma=(2**level) * 0.56) + ref_I = gaussian_filter(self.ref_yuv[:, :, 0], sigma=(2**level) * 0.56) + self.ref_Is.append(ref_I) + self.cur_Is.append(cur_I) + level += 1 + + """ + get curvature of each block + """ + + def get_curvature(self, I): + c_max = np.zeros((self.num_row, self.num_col)) + c_min = np.zeros((self.num_row, self.num_col)) + e_max = np.zeros((self.num_row, self.num_col, 2)) + e_min = np.zeros((self.num_row, self.num_col, 2)) + for r in xrange(self.num_row): + for c in xrange(self.num_col): + h11, h12, h21, h22 = 0, 0, 0, 0 + for i in xrange(r * self.blk_sz, r * self.blk_sz + self.blk_sz): + for j in xrange(c * self.blk_sz, c * self.blk_sz + self.blk_sz): + if 0 <= i < self.height - 1 and 0 <= j < self.width - 1: + Ix = I[i][j + 1] - I[i][j] + Iy = I[i + 1][j] - I[i][j] + h11 += Iy * Iy + h12 += Ix * Iy + h21 += Ix * Iy + h22 += Ix * Ix + U, S, _ = LA.svd(np.array([[h11, h12], [h21, h22]])) + c_max[r, c], c_min[r, c] = S[0], S[1] + e_max[r, c] = U[:, 0] + e_min[r, c] = U[:, 1] + return c_max, c_min, e_max, e_min + + """ + get ssd of motion vector: + cur_I: current intensity + ref_I: reference intensity + center: current position + mv: motion vector + """ + + def get_ssd(self, cur_I, ref_I, center, mv): + ssd = 0 + for r in xrange(int(center[0]), int(center[0]) + self.blk_sz): + for c in xrange(int(center[1]), int(center[1]) + self.blk_sz): + if 0 <= r < self.height and 0 <= c < self.width: + tr, tc = r + int(mv[0]), c + int(mv[1]) + if 0 <= tr < self.height and 0 <= tc < self.width: + ssd += (ref_I[tr, tc] - cur_I[r, c])**2 + else: + ssd += cur_I[r, c]**2 + return ssd + + """ + get region match of level l + l: current level + last_mvs: matchine results of last level + radius: movenment radius + """ + + def region_match(self, l, last_mvs, radius): + mvs = np.zeros((self.num_row, self.num_col, 2)) + min_ssds = np.zeros((self.num_row, self.num_col)) + for r in xrange(self.num_row): + for c in xrange(self.num_col): + center = np.array([r * self.blk_sz, c * self.blk_sz]) + #use overlap hierarchy policy + init_mvs = [] + if last_mvs is None: + init_mvs = [np.array([0, 0])] + else: + for i, j in {(r, c), (r, c + 1), (r + 1, c), (r + 1, c + 1)}: + if 0 <= i < last_mvs.shape[0] and 0 <= j < last_mvs.shape[1]: + init_mvs.append(last_mvs[i, j]) + #use last matching results as the start postion as current level + min_ssd = None + min_mv = None + for init_mv in init_mvs: + for i in xrange(-2, 3): + for j in xrange(-2, 3): + mv = init_mv + np.array([i, j]) * radius + ssd = self.get_ssd(self.cur_Is[l], self.ref_Is[l], center, mv) + if min_ssd is None or ssd < min_ssd: + min_ssd = ssd + min_mv = mv + min_ssds[r, c] = min_ssd + mvs[r, c] = min_mv + return mvs, min_ssds + + """ + smooth motion field based on neighbor constraint + uvs: current estimation + mvs: matching results + min_ssds: minimum ssd of matching results + l: current level + """ + + def smooth(self, uvs, mvs, min_ssds, l): + sm_uvs = np.zeros((self.num_row, self.num_col, 2)) + c_max = self.c_maxs[l] + c_min = self.c_mins[l] + e_max = self.e_maxs[l] + e_min = self.e_mins[l] + for r in xrange(self.num_row): + for c in xrange(self.num_col): + w_max = c_max[r, c] / ( + self.k1 + self.k2 * min_ssds[r, c] + self.k3 * c_max[r, c]) + w_min = c_min[r, c] / ( + self.k1 + self.k2 * min_ssds[r, c] + self.k3 * c_min[r, c]) + w = w_max * w_min / (w_max + w_min + 1e-6) + if w < 0: + w = 0 + avg_uv = np.array([0.0, 0.0]) + for i, j in {(r - 1, c), (r + 1, c), (r, c - 1), (r, c + 1)}: + if 0 <= i < self.num_row and 0 <= j < self.num_col: + avg_uv += 0.25 * uvs[i, j] + sm_uvs[r, c] = (w * w * mvs[r, c] + self.beta * avg_uv) / ( + self.beta + w * w) + return sm_uvs + + """ + motion field estimation + """ + + def motion_field_estimation(self): + last_mvs = None + for l in xrange(self.levels, -1, -1): + mvs, min_ssds = self.region_match(l, last_mvs, 2**l) + uvs = np.zeros(mvs.shape) + for _ in xrange(self.max_iter): + uvs = self.smooth(uvs, mvs, min_ssds, l) + last_mvs = uvs + for r in xrange(self.num_row): + for c in xrange(self.num_col): + self.mf[r, c] = uvs[r, c]