Source code for miop.factorization

# Original code from Stefan Toeberg (2023)
# Copyright (c) 2023, Stefan Toeberg.
#
# Modified by Maxime Paschoud (2025)
# Copyright (c) 2025, Maxime Paschoud.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
#
# (http://opensource.org/licenses/BSD-3-Clause)
#
# __author__ = "Stefan Toeberg, LUH: IMR; Maxime Paschoud, ETHZ"
# __description__ = """
#                   factorization methods and related algorithms regarding
#                   the basic orthographic approach and the extended
#                   scaled orthographic approach
#                   """
#  

from pipeline import DAGNode

import warnings

import numpy as np


[docs] class Factorization(DAGNode): """ Builds a measurement matrix from corresponding points and factorizes it to recover the movement of the cameras. Attributes ---------- reproErrorTh : float Threshold for the reprojection error. max_matches : int Maximum number of matches to consider (default is 0, meaning no limit). batch_size : int Number of images to process in each batch (default is 4). rot_angles : list List of rotation angles for the camera movements. Set in eval(). """ def __init__(self, reproErrorTh, normalized_view, max_matches=0, batch_size=4): """ Parameters ---------- reproErrorTh : float The reprojection error threshold for outlier removal. normalized_view : int Index of the normalized view to be used for scaling. max_matches : int, optional Maximum number of matches to consider (default is 0, no limit). batch_size : int, optional Number of images to process in each batch (default is 4). Warns ----- Warning If batch_size is less than 3, a warning is issued. """ super().__init__() self.reproErrorTh = reproErrorTh self.normalized_view = normalized_view self.max_matches = max_matches if batch_size < 3: warnings.warn("It is recommended to have a batch_size of at least 3.") self.batch_size = batch_size self.rot_angles = []
[docs] def eval(self, matches): """ Computes camera parameters from corresponding matches using factorization on batches of images. The camera movements for each batch are propagated to the rest of the face. The first camera of each face is used as relative origin with zero tilt. The tilt angles are forced positive, so the image collection should have monotonically increasing tilt angles. Parameters ---------- matches : list of list of numpy.ndarray Corresponding points across the image collection. The first list level represents faces. Returns ------- list A list containing the computed camera parameters for each face. """ print("Start of Factorization Node...") camera_param = [] for matches_face in matches: camera_param.append({}) matches_face = np.asarray(matches_face) m, n, _ = matches_face.shape if self.max_matches != 0: sampled_indices = np.random.choice(n, self.max_matches, replace=False) matches_face = matches_face[:, sampled_indices, :] # compute camera movements batch_size images at a time i = 0 while i < len(matches_face): next_i = i + self.batch_size - 1 if len(matches_face) - next_i < self.batch_size: meas_mat, centroids = self.build_measurement_matrix(matches_face[i:]) next_i = len(matches_face) else: meas_mat, centroids = self.build_measurement_matrix(matches_face[i:(i + self.batch_size)]) cameras_face = self.factorize(meas_mat, centroids) # rotate in positive direction angles = self.rotation_angles_arctan(cameras_face['rotations'][1]) if np.max(np.abs(angles)) != np.max(angles): # inverse rotation matrix cameras_face['rotations'] = cameras_face['rotations'].transpose(0, 2, 1) if i == 0: camera_param[-1] = cameras_face else: # propagate prev_rot = camera_param[-1]['rotations'][-1] prev_scaling = camera_param[-1]['intrinsics'][0][-1] cameras_face['rotations'] @= prev_rot cameras_face['intrinsics'][0] *= prev_scaling camera_param[-1]['rotations'] = np.concatenate((camera_param[-1]['rotations'], cameras_face['rotations'][1:])) camera_param[-1]['intrinsics'][0] = np.concatenate((camera_param[-1]['intrinsics'][0], cameras_face['intrinsics'][0][1:])) i = next_i self.camera_param = camera_param print("Estimated rotation angles:") rot_angles = [] for i, param_face in enumerate(camera_param): print(f"Face {i}:") rot_angles.append(self.print_rotation_angles(param_face['rotations'])) self.rot_angles = rot_angles print("End of Factorization.") return [camera_param]
[docs] def build_measurement_matrix(self, matches): """ Constructs a measurement matrix from corresponding points in multiple images. Parameters ---------- matches : numpy.ndarray Array of shape (m, n, 2), where m is number of images, n is number of points, and 2 corresponds to (x, y) coordinates. Returns ------- tuple meas_mat : numpy.ndarray The constructed measurement matrix of shape (2*m, n). centers : numpy.ndarray The centroids of the matches used for centering the points, shape (m, 2). """ # center the data centers = np.mean(matches, axis=1) matches_centered = matches - centers[:, None, :] # construct registered Measurement Matrix meas_mat = np.empty((2*len(matches_centered), matches_centered.shape[1])) meas_mat[:matches_centered.shape[0], :] = matches_centered[..., 0] meas_mat[matches_centered.shape[0]:, :] = matches_centered[..., 1] return meas_mat, centers
[docs] def factorize(self, measurementMatrix, centroids): """ Factorizes the measurement matrix to estimate camera rotations and intrinsics. Parameters ---------- measurementMatrix : numpy.ndarray The measurement matrix to factorize. centroids : numpy.ndarray Centroids of matched points for adjustment during factorization. Returns ------- dict Dictionary containing: - 'rotations': numpy.ndarray of estimated rotation matrix. - 'intrinsics': list containing normalized scaling factors and parameters. """ W = measurementMatrix resultFactor = self.factorization_sc(W, decomp='eigen', normRot=0) reproW, _ = self.reprojected_measurement_matrix(resultFactor) reproErrW = self.signed_error(reproW, W) max_iter = 10 i = 0 while self.compute_rmse(reproErrW.flatten()) > self.reproErrorTh and i < max_iter: W, _ = self.remove_outliers_repro(W, reproW, self.reproErrorTh) # correct the registration and translate the centroid in the origin W = W - np.vstack(np.mean(W, axis=1)) # correct the previously computed centroid centroids = centroids + np.hstack((np.vstack(np.mean(W, axis=1))[:int(len( W)/2)], np.vstack(np.mean(W, axis=1))[int(len(W)/2):])) # update result resultFactor = self.factorization_sc(W, decomp="eigen", normRot=0) reproW, _ = self.reprojected_measurement_matrix(resultFactor) reproErrW = self.signed_error(reproW, W) i += 1 camDict = {} camDict['rotations'] = np.asarray(resultFactor[0]) camDict['intrinsics'] = [resultFactor[2] / resultFactor[2][self.normalized_view], resultFactor[3]] print(f"{np.shape(measurementMatrix)[1]-np.shape(W)[1]} matche(s) removed due to the reprojection error threshold of {self.reproErrorTh} px.") rot_angles = self.print_rotation_angles(resultFactor[0], name='SC') scaling_factors = self.print_scaling_factors(resultFactor[2]) print('RMSE of the reprojection error is given as ' '{:.4f} px.'.format(self.compute_rmse(reproErrW.flatten()))) return camDict
[docs] def print_scaling_factors(self, scaling_factors): """ Normalizes and prints the relative scaling factors of each view. Parameters ---------- scaling_factors : numpy.ndarray Scaling factors for each camera view. Returns ------- numpy.ndarray Normalized scaling factors with respect to the normalized view. """ s = scaling_factors / scaling_factors[self.normalized_view] print('Relative scaling factors of each view:\n{}\n'.format(s, end='', flush=True)) return s
[docs] def print_rotation_angles(self, R, name='SC'): """ Prints rotation angles for each rotation matrix in degrees. Parameters ---------- R : numpy.ndarray Rotation matrices array, shape (num_cameras, 3, 3). name : str, optional Name to prefix the print statements (default is 'SC'). Returns ------- list of numpy.ndarray List of rotation angles [phiX, phiY, phiZ] in degrees for each rotation matrix. """ print(f"{name} Factorization - Rotation Angles [phiX, phiY, phiZ]:") rotAngles = [] for i,r in enumerate(R): angles = self.rotation_angles_arctan(r) print(f"Image {i}: {angles}") rotAngles.append(angles) return rotAngles
[docs] def rotation_angles_arctan(self, R): """ Extract rotation angles (in degrees) from a rotation matrix using arctangent. Parameters ---------- R : numpy.ndarray A 3x3 rotation matrix. Returns ------- numpy.ndarray Rotation angles [x, y, z] in degrees. """ # extract the rotation angles from a rotation matrix x = np.arctan2(R[2, 1], R[2, 2]) y = np.arctan2(-R[2, 0], np.sqrt(R[2, 1]**2 + R[2, 2]**2)) z = np.arctan2(R[1, 0], R[0, 0]) return np.array([x, y, z]) / np.pi * 180
[docs] def compute_rmse(self, distances): """ Computes the root mean square error (RMSE) from a list of distances. Parameters ---------- distances : array-like List or array of distance errors. Returns ------- float The computed RMSE value. """ n = len(distances) rmse = np.sqrt((1/n) * np.sum((distances)**2)) return rmse
[docs] def factorization_sc(self, W, decomp="eigen", normRot=0): """ Performs factorization using the scaling and camera motion model described by Poelman and Kanade. Parameters ---------- W : numpy.ndarray The measurement matrix containing point correspondences across frames. decomp : str, optional Decomposition method to use: 'eigen' or 'cholesky' (default is 'eigen'). normRot : int, optional Index of the camera used as reference for normalization (default is 0). Returns ------- tuple RotationMatrices : list of numpy.ndarray List of rotation matrices for each frame. Shape : numpy.ndarray The estimated 3D shape matrix. kiNew : list Normalized scaling factors for each frame. params : numpy.ndarray Additional parameters (2-element array). Q : numpy.ndarray The matrix used for correction. ki : list Original scaling factors for each frame. """ nof = int(len(W[:, 0]) / 2) U, s, V = np.linalg.svd(W) U = U[:, 0:3] s = s[0:3] s12 = np.diag(np.sqrt(s)) V = V[0:3, :] R = U @ s12 S = s12 @ V # build linear system of scaled orthographic constraints Ax=b iT = R[0:nof] jT = R[nof:] # build linear system of metric constraints Ax=b G = np.vstack((self.gT(iT, iT) - self.gT(jT, jT), self.gT(iT, jT), self.gT1(iT[0, :], iT[0, :]))) b = np.vstack((np.zeros((nof * 2, 1)), 1)) b = np.squeeze(np.array(b).T) # solve with Moore-Penrose Pseudoinverse I = np.linalg.pinv(G) @ b # build the symmetric matrix L L = np.array([[I[0], I[1], I[2]], [I[1], I[3], I[4]], [I[2], I[4], I[5]]]) # eigendecompose L to compute the nearest positive definite matrix w, v = np.linalg.eig(L) # set negative eigenvalues to eps>0 to get closest positive definite # matrix in frobenius norm w = w.clip(min=0) w[w == 0] = 10 ** (-9) if decomp == "cholesky": # use cholesky decomposition to obtain Q # get L positive definite Lpd = v @ np.diag(w) @ v.T Q = np.linalg.cholesky(Lpd) elif decomp == "eigen": # use eigen decomposition to obtain Q: # L = Q Q.T = U D U.T = U D^1/2*D^1/2 U.T -> Q = U D^1/2 w = np.diag(np.sqrt(w)) Q = v @ w # compute euclidean cameras and structure R = R @ Q # compute k and get the motion parameters ki = [] for i in range(0, nof * 2): # get the true scaling factors for the images ki.append( np.linalg.norm(R[i]) ) # would be zf in the paper by Poelman and Kanade zf.append(1/np.linalg.norm(R[i]) ) R[i] = R[i] / np.linalg.norm(R[i]) S = np.linalg.inv(Q) @ S # construct one rotation matrix for each frame RotationMatrices = [] kiMean = [] # computing the mean of the scaling factors per "row" # m1 and n1 should have same scaling, due to noise they differ slightly # therefore the mean is used for i in range(0, nof): RotationMatrices.append( np.array([R[i, :], R[i + nof, :], np.cross(R[i, :], R[i + nof, :])]) ) kiMean.append((ki[i] + ki[i + nof]) / 2) # averaged scaling factors kiNew = [] for kii in kiMean: kiNew.append(kii / kiMean[normRot]) # set Frame 1 to world coordinate system -> first rotation matrix is identity matrix RotationToWorld = np.linalg.inv( RotationMatrices[normRot]) # generally 0 = identiy for i in range(0, nof): RotationMatrices[i] = RotationMatrices[i] @ RotationToWorld Shape = S.T @ RotationToWorld c = 1 s = 0 params = np.hstack((c, s)) return RotationMatrices, Shape, kiNew, params, Q, ki
[docs] def gT(self, a, b): """ Computes the constraint vector used in the metric upgrade process. Parameters ---------- a : np.ndarray An array of shape (frames, 3) representing camera motion vectors. b : np.ndarray An array of shape (frames, 3) representing camera motion vectors. Returns ------- np.ndarray A (frames, 6) array representing the constraint vectors for each frame. """ g = np.array( ( [ a[:, 0] * b[:, 0], a[:, 0] * b[:, 1] + a[:, 1] * b[:, 0], a[:, 0] * b[:, 2] + a[:, 2] * b[:, 0], a[:, 1] * b[:, 1], a[:, 1] * b[:, 2] + a[:, 2] * b[:, 1], a[:, 2] * b[:, 2], ] ) ) return g.T
[docs] def gT1(self, a, b): """ Constructs a vector used in solving the quadratic equations for factorization based on Tomasi/Kanade and Poelman/Kanade methods. Parameters ---------- a : np.ndarray A vector of shape (3,) representing the motion of the first frame. b : np.ndarray A vector of shape (3,) representing the motion of the first frame. Returns ------- np.ndarray A vector of shape (6,) containing the constraint terms. """ g1 = np.array( ( [ a[0] * b[0], a[0] * b[1] + a[1] * b[0], a[0] * b[2] + a[2] * b[0], a[1] * b[1], a[1] * b[2] + a[2] * b[1], a[2] * b[2], ] ) ) return g1.T
[docs] def reprojected_measurement_matrix(self, result): """ Reprojects the 3D structure back to 2D image space using the estimated camera rotations and scaling. Parameters ---------- result : tuple The factorization result containing: - rotations (list of np.ndarray): Camera rotation matrices. - structure (np.ndarray): 3D structure. - scalings (list of float): Scaling factors per image. - params (tuple): Affine parameters (c, s). Returns ------- tuple - reW (np.ndarray): The reprojected 2D measurement matrix of shape (2*num_views, num_points). - P_trunc (list of np.ndarray): List of projection matrices for each view. """ ki = result[2] c, s = result[3] eucShape = np.asarray(result[1]) eucRot = np.asarray(result[0]) uPts = [] vPts = [] # reproject 3D points into every image with its own rotation matrix P_trunc = [] for j,rot in enumerate(eucRot): Ka = ki[j] * np.array([[1 * c, 0], [s, 1]], dtype=np.float64) rePts = Ka @ rot[0:2, :].dot(eucShape.T) P_trunc.append(Ka @ rot[0:2, :]) uPts.append(rePts[0]) vPts.append(rePts[1]) # get new measurement matrix reW = np.vstack((np.asarray(uPts), np.asarray(vPts))) return reW, P_trunc
[docs] def remove_outliers_repro(self, W, reW, thFac): """ Removes outlier points based on reprojection error by applying a threshold. Parameters ---------- W : np.ndarray The original 2D measurement matrix of shape (2*num_views, num_points). reW : np.ndarray The reprojected 2D measurement matrix of shape (2*num_views, num_points). thFac : float The reprojection error threshold in pixels. Returns ------- tuple of np.ndarray - Wnew (np.ndarray): Cleaned measurement matrix after removing outliers. - WreNew (np.ndarray): Corresponding reprojected measurement matrix. """ # get error of all matches in all images allErr = self.signed_error(reW, W) oriDist = np.sqrt(allErr[0] ** 2 + allErr[1] ** 2) mask = oriDist < thFac numMatches = np.shape(W)[1] numImgs = int(len(mask) / numMatches) maskAll = mask[:numMatches] for i in range(numImgs - 1): i = i + 1 maskAll = maskAll * mask[numMatches * i: numMatches * (i + 1)] idx = np.argwhere(maskAll == False) Wnew = W.copy() WreNew = reW.copy() Wnew = np.delete(Wnew, idx, 1) WreNew = np.delete(reW, idx, 1) return Wnew, WreNew
[docs] def signed_error(self, reW, W): """ Computes the signed reprojection error between the reprojected and original measurement matrices. Parameters ---------- reW : np.ndarray The reprojected 2D measurement matrix of shape (2*num_views, num_points). W : np.ndarray The original 2D measurement matrix of shape (2*num_views, num_points). Returns ------- np.ndarray A (2, num_points) array containing the signed errors in u and v directions. """ nof = int(np.shape(W)[0] / 2) reSignErr = reW - W errU = np.hstack(reSignErr[0:nof]) errV = np.hstack(reSignErr[nof:]) err = np.vstack((errU, errV)) return err
[docs] def show(self): """ Prints the estimated rotation angles for each face. """ print(self.rot_angles)