# Original code from Stefan Toeberg (2023)
# Copyright (c) 2023, Stefan Toeberg.
#
# Modified by Maxime Paschoud (2025)
# Copyright (c) 2025, Maxime Paschoud.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
#
# (http://opensource.org/licenses/BSD-3-Clause)
#
# __author__ = "Stefan Toeberg, LUH: IMR; Maxime Paschoud, ETHZ"
# __description__ = """
# factorization methods and related algorithms regarding
# the basic orthographic approach and the extended
# scaled orthographic approach
# """
#
from pipeline import DAGNode
import warnings
import numpy as np
[docs]
class Factorization(DAGNode):
"""
Builds a measurement matrix from corresponding points and factorizes it to recover the movement of the cameras.
Attributes
----------
reproErrorTh : float
Threshold for the reprojection error.
max_matches : int
Maximum number of matches to consider (default is 0, meaning no limit).
batch_size : int
Number of images to process in each batch (default is 4).
rot_angles : list
List of rotation angles for the camera movements. Set in eval().
"""
def __init__(self, reproErrorTh, normalized_view, max_matches=0, batch_size=4):
"""
Parameters
----------
reproErrorTh : float
The reprojection error threshold for outlier removal.
normalized_view : int
Index of the normalized view to be used for scaling.
max_matches : int, optional
Maximum number of matches to consider (default is 0, no limit).
batch_size : int, optional
Number of images to process in each batch (default is 4).
Warns
-----
Warning
If batch_size is less than 3, a warning is issued.
"""
super().__init__()
self.reproErrorTh = reproErrorTh
self.normalized_view = normalized_view
self.max_matches = max_matches
if batch_size < 3:
warnings.warn("It is recommended to have a batch_size of at least 3.")
self.batch_size = batch_size
self.rot_angles = []
[docs]
def eval(self, matches):
"""
Computes camera parameters from corresponding matches using factorization on batches of images.
The camera movements for each batch are propagated to the rest of the face.
The first camera of each face is used as relative origin with zero tilt.
The tilt angles are forced positive, so the image collection should have monotonically increasing tilt angles.
Parameters
----------
matches : list of list of numpy.ndarray
Corresponding points across the image collection. The first list level represents faces.
Returns
-------
list
A list containing the computed camera parameters for each face.
"""
print("Start of Factorization Node...")
camera_param = []
for matches_face in matches:
camera_param.append({})
matches_face = np.asarray(matches_face)
m, n, _ = matches_face.shape
if self.max_matches != 0:
sampled_indices = np.random.choice(n, self.max_matches, replace=False)
matches_face = matches_face[:, sampled_indices, :]
# compute camera movements batch_size images at a time
i = 0
while i < len(matches_face):
next_i = i + self.batch_size - 1
if len(matches_face) - next_i < self.batch_size:
meas_mat, centroids = self.build_measurement_matrix(matches_face[i:])
next_i = len(matches_face)
else:
meas_mat, centroids = self.build_measurement_matrix(matches_face[i:(i + self.batch_size)])
cameras_face = self.factorize(meas_mat, centroids)
# rotate in positive direction
angles = self.rotation_angles_arctan(cameras_face['rotations'][1])
if np.max(np.abs(angles)) != np.max(angles):
# inverse rotation matrix
cameras_face['rotations'] = cameras_face['rotations'].transpose(0, 2, 1)
if i == 0:
camera_param[-1] = cameras_face
else: # propagate
prev_rot = camera_param[-1]['rotations'][-1]
prev_scaling = camera_param[-1]['intrinsics'][0][-1]
cameras_face['rotations'] @= prev_rot
cameras_face['intrinsics'][0] *= prev_scaling
camera_param[-1]['rotations'] = np.concatenate((camera_param[-1]['rotations'], cameras_face['rotations'][1:]))
camera_param[-1]['intrinsics'][0] = np.concatenate((camera_param[-1]['intrinsics'][0], cameras_face['intrinsics'][0][1:]))
i = next_i
self.camera_param = camera_param
print("Estimated rotation angles:")
rot_angles = []
for i, param_face in enumerate(camera_param):
print(f"Face {i}:")
rot_angles.append(self.print_rotation_angles(param_face['rotations']))
self.rot_angles = rot_angles
print("End of Factorization.")
return [camera_param]
[docs]
def build_measurement_matrix(self, matches):
"""
Constructs a measurement matrix from corresponding points in multiple images.
Parameters
----------
matches : numpy.ndarray
Array of shape (m, n, 2), where m is number of images, n is number of points, and 2 corresponds to (x, y) coordinates.
Returns
-------
tuple
meas_mat : numpy.ndarray
The constructed measurement matrix of shape (2*m, n).
centers : numpy.ndarray
The centroids of the matches used for centering the points, shape (m, 2).
"""
# center the data
centers = np.mean(matches, axis=1)
matches_centered = matches - centers[:, None, :]
# construct registered Measurement Matrix
meas_mat = np.empty((2*len(matches_centered), matches_centered.shape[1]))
meas_mat[:matches_centered.shape[0], :] = matches_centered[..., 0]
meas_mat[matches_centered.shape[0]:, :] = matches_centered[..., 1]
return meas_mat, centers
[docs]
def factorize(self, measurementMatrix, centroids):
"""
Factorizes the measurement matrix to estimate camera rotations and intrinsics.
Parameters
----------
measurementMatrix : numpy.ndarray
The measurement matrix to factorize.
centroids : numpy.ndarray
Centroids of matched points for adjustment during factorization.
Returns
-------
dict
Dictionary containing:
- 'rotations': numpy.ndarray of estimated rotation matrix.
- 'intrinsics': list containing normalized scaling factors and parameters.
"""
W = measurementMatrix
resultFactor = self.factorization_sc(W, decomp='eigen', normRot=0)
reproW, _ = self.reprojected_measurement_matrix(resultFactor)
reproErrW = self.signed_error(reproW, W)
max_iter = 10
i = 0
while self.compute_rmse(reproErrW.flatten()) > self.reproErrorTh and i < max_iter:
W, _ = self.remove_outliers_repro(W, reproW, self.reproErrorTh)
# correct the registration and translate the centroid in the origin
W = W - np.vstack(np.mean(W, axis=1))
# correct the previously computed centroid
centroids = centroids + np.hstack((np.vstack(np.mean(W, axis=1))[:int(len(
W)/2)], np.vstack(np.mean(W, axis=1))[int(len(W)/2):]))
# update result
resultFactor = self.factorization_sc(W, decomp="eigen", normRot=0)
reproW, _ = self.reprojected_measurement_matrix(resultFactor)
reproErrW = self.signed_error(reproW, W)
i += 1
camDict = {}
camDict['rotations'] = np.asarray(resultFactor[0])
camDict['intrinsics'] = [resultFactor[2] / resultFactor[2][self.normalized_view], resultFactor[3]]
print(f"{np.shape(measurementMatrix)[1]-np.shape(W)[1]} matche(s) removed due to the reprojection error threshold of {self.reproErrorTh} px.")
rot_angles = self.print_rotation_angles(resultFactor[0], name='SC')
scaling_factors = self.print_scaling_factors(resultFactor[2])
print('RMSE of the reprojection error is given as '
'{:.4f} px.'.format(self.compute_rmse(reproErrW.flatten())))
return camDict
[docs]
def print_scaling_factors(self, scaling_factors):
"""
Normalizes and prints the relative scaling factors of each view.
Parameters
----------
scaling_factors : numpy.ndarray
Scaling factors for each camera view.
Returns
-------
numpy.ndarray
Normalized scaling factors with respect to the normalized view.
"""
s = scaling_factors / scaling_factors[self.normalized_view]
print('Relative scaling factors of each view:\n{}\n'.format(s, end='', flush=True))
return s
[docs]
def print_rotation_angles(self, R, name='SC'):
"""
Prints rotation angles for each rotation matrix in degrees.
Parameters
----------
R : numpy.ndarray
Rotation matrices array, shape (num_cameras, 3, 3).
name : str, optional
Name to prefix the print statements (default is 'SC').
Returns
-------
list of numpy.ndarray
List of rotation angles [phiX, phiY, phiZ] in degrees for each rotation matrix.
"""
print(f"{name} Factorization - Rotation Angles [phiX, phiY, phiZ]:")
rotAngles = []
for i,r in enumerate(R):
angles = self.rotation_angles_arctan(r)
print(f"Image {i}: {angles}")
rotAngles.append(angles)
return rotAngles
[docs]
def rotation_angles_arctan(self, R):
"""
Extract rotation angles (in degrees) from a rotation matrix using arctangent.
Parameters
----------
R : numpy.ndarray
A 3x3 rotation matrix.
Returns
-------
numpy.ndarray
Rotation angles [x, y, z] in degrees.
"""
# extract the rotation angles from a rotation matrix
x = np.arctan2(R[2, 1], R[2, 2])
y = np.arctan2(-R[2, 0], np.sqrt(R[2, 1]**2 + R[2, 2]**2))
z = np.arctan2(R[1, 0], R[0, 0])
return np.array([x, y, z]) / np.pi * 180
[docs]
def compute_rmse(self, distances):
"""
Computes the root mean square error (RMSE) from a list of distances.
Parameters
----------
distances : array-like
List or array of distance errors.
Returns
-------
float
The computed RMSE value.
"""
n = len(distances)
rmse = np.sqrt((1/n) * np.sum((distances)**2))
return rmse
[docs]
def factorization_sc(self, W, decomp="eigen", normRot=0):
"""
Performs factorization using the scaling and camera motion model described by Poelman and Kanade.
Parameters
----------
W : numpy.ndarray
The measurement matrix containing point correspondences across frames.
decomp : str, optional
Decomposition method to use: 'eigen' or 'cholesky' (default is 'eigen').
normRot : int, optional
Index of the camera used as reference for normalization (default is 0).
Returns
-------
tuple
RotationMatrices : list of numpy.ndarray
List of rotation matrices for each frame.
Shape : numpy.ndarray
The estimated 3D shape matrix.
kiNew : list
Normalized scaling factors for each frame.
params : numpy.ndarray
Additional parameters (2-element array).
Q : numpy.ndarray
The matrix used for correction.
ki : list
Original scaling factors for each frame.
"""
nof = int(len(W[:, 0]) / 2)
U, s, V = np.linalg.svd(W)
U = U[:, 0:3]
s = s[0:3]
s12 = np.diag(np.sqrt(s))
V = V[0:3, :]
R = U @ s12
S = s12 @ V
# build linear system of scaled orthographic constraints Ax=b
iT = R[0:nof]
jT = R[nof:]
# build linear system of metric constraints Ax=b
G = np.vstack((self.gT(iT, iT) - self.gT(jT, jT),
self.gT(iT, jT), self.gT1(iT[0, :], iT[0, :])))
b = np.vstack((np.zeros((nof * 2, 1)), 1))
b = np.squeeze(np.array(b).T)
# solve with Moore-Penrose Pseudoinverse
I = np.linalg.pinv(G) @ b
# build the symmetric matrix L
L = np.array([[I[0], I[1], I[2]], [I[1], I[3], I[4]], [I[2], I[4], I[5]]])
# eigendecompose L to compute the nearest positive definite matrix
w, v = np.linalg.eig(L)
# set negative eigenvalues to eps>0 to get closest positive definite
# matrix in frobenius norm
w = w.clip(min=0)
w[w == 0] = 10 ** (-9)
if decomp == "cholesky":
# use cholesky decomposition to obtain Q
# get L positive definite
Lpd = v @ np.diag(w) @ v.T
Q = np.linalg.cholesky(Lpd)
elif decomp == "eigen":
# use eigen decomposition to obtain Q:
# L = Q Q.T = U D U.T = U D^1/2*D^1/2 U.T -> Q = U D^1/2
w = np.diag(np.sqrt(w))
Q = v @ w
# compute euclidean cameras and structure
R = R @ Q
# compute k and get the motion parameters
ki = []
for i in range(0, nof * 2):
# get the true scaling factors for the images
ki.append(
np.linalg.norm(R[i])
) # would be zf in the paper by Poelman and Kanade zf.append(1/np.linalg.norm(R[i]) )
R[i] = R[i] / np.linalg.norm(R[i])
S = np.linalg.inv(Q) @ S
# construct one rotation matrix for each frame
RotationMatrices = []
kiMean = []
# computing the mean of the scaling factors per "row"
# m1 and n1 should have same scaling, due to noise they differ slightly
# therefore the mean is used
for i in range(0, nof):
RotationMatrices.append(
np.array([R[i, :], R[i + nof, :], np.cross(R[i, :], R[i + nof, :])])
)
kiMean.append((ki[i] + ki[i + nof]) / 2)
# averaged scaling factors
kiNew = []
for kii in kiMean:
kiNew.append(kii / kiMean[normRot])
# set Frame 1 to world coordinate system -> first rotation matrix is identity matrix
RotationToWorld = np.linalg.inv(
RotationMatrices[normRot]) # generally 0 = identiy
for i in range(0, nof):
RotationMatrices[i] = RotationMatrices[i] @ RotationToWorld
Shape = S.T @ RotationToWorld
c = 1
s = 0
params = np.hstack((c, s))
return RotationMatrices, Shape, kiNew, params, Q, ki
[docs]
def gT(self, a, b):
"""
Computes the constraint vector used in the metric upgrade process.
Parameters
----------
a : np.ndarray
An array of shape (frames, 3) representing camera motion vectors.
b : np.ndarray
An array of shape (frames, 3) representing camera motion vectors.
Returns
-------
np.ndarray
A (frames, 6) array representing the constraint vectors for each frame.
"""
g = np.array(
(
[
a[:, 0] * b[:, 0],
a[:, 0] * b[:, 1] + a[:, 1] * b[:, 0],
a[:, 0] * b[:, 2] + a[:, 2] * b[:, 0],
a[:, 1] * b[:, 1],
a[:, 1] * b[:, 2] + a[:, 2] * b[:, 1],
a[:, 2] * b[:, 2],
]
)
)
return g.T
[docs]
def gT1(self, a, b):
"""
Constructs a vector used in solving the quadratic equations for factorization
based on Tomasi/Kanade and Poelman/Kanade methods.
Parameters
----------
a : np.ndarray
A vector of shape (3,) representing the motion of the first frame.
b : np.ndarray
A vector of shape (3,) representing the motion of the first frame.
Returns
-------
np.ndarray
A vector of shape (6,) containing the constraint terms.
"""
g1 = np.array(
(
[
a[0] * b[0],
a[0] * b[1] + a[1] * b[0],
a[0] * b[2] + a[2] * b[0],
a[1] * b[1],
a[1] * b[2] + a[2] * b[1],
a[2] * b[2],
]
)
)
return g1.T
[docs]
def reprojected_measurement_matrix(self, result):
"""
Reprojects the 3D structure back to 2D image space using the estimated camera rotations and scaling.
Parameters
----------
result : tuple
The factorization result containing:
- rotations (list of np.ndarray): Camera rotation matrices.
- structure (np.ndarray): 3D structure.
- scalings (list of float): Scaling factors per image.
- params (tuple): Affine parameters (c, s).
Returns
-------
tuple
- reW (np.ndarray): The reprojected 2D measurement matrix of shape (2*num_views, num_points).
- P_trunc (list of np.ndarray): List of projection matrices for each view.
"""
ki = result[2]
c, s = result[3]
eucShape = np.asarray(result[1])
eucRot = np.asarray(result[0])
uPts = []
vPts = []
# reproject 3D points into every image with its own rotation matrix
P_trunc = []
for j,rot in enumerate(eucRot):
Ka = ki[j] * np.array([[1 * c, 0], [s, 1]], dtype=np.float64)
rePts = Ka @ rot[0:2, :].dot(eucShape.T)
P_trunc.append(Ka @ rot[0:2, :])
uPts.append(rePts[0])
vPts.append(rePts[1])
# get new measurement matrix
reW = np.vstack((np.asarray(uPts), np.asarray(vPts)))
return reW, P_trunc
[docs]
def remove_outliers_repro(self, W, reW, thFac):
"""
Removes outlier points based on reprojection error by applying a threshold.
Parameters
----------
W : np.ndarray
The original 2D measurement matrix of shape (2*num_views, num_points).
reW : np.ndarray
The reprojected 2D measurement matrix of shape (2*num_views, num_points).
thFac : float
The reprojection error threshold in pixels.
Returns
-------
tuple of np.ndarray
- Wnew (np.ndarray): Cleaned measurement matrix after removing outliers.
- WreNew (np.ndarray): Corresponding reprojected measurement matrix.
"""
# get error of all matches in all images
allErr = self.signed_error(reW, W)
oriDist = np.sqrt(allErr[0] ** 2 + allErr[1] ** 2)
mask = oriDist < thFac
numMatches = np.shape(W)[1]
numImgs = int(len(mask) / numMatches)
maskAll = mask[:numMatches]
for i in range(numImgs - 1):
i = i + 1
maskAll = maskAll * mask[numMatches * i: numMatches * (i + 1)]
idx = np.argwhere(maskAll == False)
Wnew = W.copy()
WreNew = reW.copy()
Wnew = np.delete(Wnew, idx, 1)
WreNew = np.delete(reW, idx, 1)
return Wnew, WreNew
[docs]
def signed_error(self, reW, W):
"""
Computes the signed reprojection error between the reprojected and original measurement matrices.
Parameters
----------
reW : np.ndarray
The reprojected 2D measurement matrix of shape (2*num_views, num_points).
W : np.ndarray
The original 2D measurement matrix of shape (2*num_views, num_points).
Returns
-------
np.ndarray
A (2, num_points) array containing the signed errors in u and v directions.
"""
nof = int(np.shape(W)[0] / 2)
reSignErr = reW - W
errU = np.hstack(reSignErr[0:nof])
errV = np.hstack(reSignErr[nof:])
err = np.vstack((errU, errV))
return err
[docs]
def show(self):
"""
Prints the estimated rotation angles for each face.
"""
print(self.rot_angles)