# Copyright (c) 2025, Maxime Paschoud.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
#
# (http://opensource.org/licenses/BSD-3-Clause)
#
# __author__ = "Maxime Paschoud, ETHZ: CMBM"
#
from pipeline import DAGNode
import numpy as np
import open3d as o3d
[docs]
class RegistrationByICP(DAGNode):
"""
Class for performing 3D point cloud registration using the Iterative Closest Point (ICP) algorithm.
This class is a wrapper for Open3D's ICP implementation.
Attributes
----------
threshold : float
Distance threshold for ICP correspondence matching. Default is 32.0.
trans_init : np.ndarray, shape (4, 4)
Initial transformation matrix used to start the ICP process. Default is identity.
pcds_reg : list of open3d.geometry.PointCloud
List of registered point clouds after running the eval() method.
"""
def __init__(self, threshold: float = 32., trans_init: np.ndarray = np.eye(4)):
"""
Initializes the ICP registration module.
Parameters
----------
threshold : float, optional
Maximum correspondence distance between point pairs. Default is 32.0.
trans_init : np.ndarray, shape (4, 4), optional
Initial transformation matrix. Default is the identity matrix.
"""
super().__init__()
self.threshold: float = threshold
self.trans_init: np.ndarray = trans_init
self.pcds_reg: [o3d.geometry.PointCloud] = None
[docs]
def eval(self, pcds: [np.ndarray]):
"""
Registers a list of point clouds using the ICP algorithm.
Parameters
----------
pcds : list of list of np.ndarray
Each sublist represents a group of point clouds for a given face (or view group).
Each point cloud is expected to be an (N, 3) NumPy array.
Returns
-------
list of open3d.geometry.PointCloud
Registered and merged point clouds.
"""
o3d.utility.set_verbosity_level(o3d.utility.VerbosityLevel.Error)
pcds_reg = []
transformations = [np.eye(4)]
for i, pcd_face in enumerate(pcds):
for j, pcd in enumerate(pcd_face):
pcd = o3d.geometry.PointCloud(o3d.utility.Vector3dVector(np.asarray(pcd)))
if j == 0 and i == 0:
pcds_reg.append(pcd)
continue
pcd_target = pcds_reg[-1]
# convert numpy array to open3d pcd
reg_p2p = o3d.pipelines.registration.registration_icp(
pcd, pcd_target, self.threshold, self.trans_init,
o3d.pipelines.registration.TransformationEstimationPointToPoint(),
o3d.pipelines.registration.ICPConvergenceCriteria(max_iteration=5000))
pcd.transform(reg_p2p.transformation)
pcds_reg[-1].points.extend(pcd.points)
transformations.append(reg_p2p.transformation)
self.pcds_reg = pcds_reg
return [pcds_reg]
[docs]
def save(self, path):
"""
Saves the registered point cloud to a PLY file.
Parameters
----------
path : str
Path to save the output PLY file. Must end with '.ply'.
Raises
------
ValueError
If `self.pcds_reg` is None or if the path does not end with '.ply'.
"""
if not path.endswith('.ply'):
raise ValueError(f"path should end with .ply. path is {path}")
if self.pcds_reg is None:
raise ValueError(f"self.pcds_reg is currently None. self.pcds_reg is typically set by running self.eval().")
o3d.io.write_point_cloud(path, self.pcds_reg[0])
[docs]
def show(self):
"""
Visualizes the registered point clouds using Open3D.
Raises
------
ValueError
If `self.pcds_reg` is None (i.e., `eval()` has not been run).
"""
if self.pcds_reg is None:
raise ValueError("Run the eval() method before calling show().")
o3d.visualization.draw_geometries(self.pcds_reg)
[docs]
class RegistrationByCorresp(DAGNode):
"""
Class for performing 3D point cloud registration using Procrustes alignment, based on known point correspondences.
Attributes
----------
pcds_tr : list of open3d.geometry.PointCloud
List of registered point clouds. Set after calling eval().
"""
def __init__(self):
"""
Initializes the Procrustes-based registration module.
"""
super().__init__()
self.pcds_tr = None
[docs]
def eval(self, pcds, camera_position, corresponding_points):
"""
Aligns a set of point clouds using Procrustes registration based on known correspondences.
Parameters
----------
pcds : list of list of np.ndarray
Each element in the outer list corresponds to a face/view. Each inner element is a point cloud (N, 3).
camera_position : list of dict
Camera intrinsic and rotation parameters for triangulation.
corresponding_points : list
List of corresponding 2D points across views for triangulation.
Returns
-------
list of open3d.geometry.PointCloud
Transformed point clouds aligned to a common frame.
"""
pcds_corresp = self.compute_point_cloud(camera_position, corresp)
# Test only for the first face
sample_n = 5000
pcd_ref = pcds_corresp[0][0]
pcds_tr = [o3d.geometry.PointCloud(o3d.utility.Vector3dVector(np.asarray(pcds[0][0])))]
for i, pcd in enumerate(pcds_corresp[0][1:]):
sampled_idx = np.random.randint(0, min(len(pcd_ref), len(pcd)), size=sample_n)
R, t = self.procrustes_3d(pcd_ref[sampled_idx], pcd[sampled_idx])
# Apply onto original pcds
pcd_new = pcds[0][i+1] @ R + t
# transform to open3D's format
pcd_new = o3d.geometry.PointCloud(o3d.utility.Vector3dVector(np.asarray(pcd_new)))
pcds_tr.append(pcd_new)
self.pcds_tr = pcds_tr
return [pcds_tr]
[docs]
def procrustes_3d(self, X, Y):
"""
Computes the optimal rotation and translation to align Y to X using the Procrustes algorithm.
Parameters
----------
X : np.ndarray, shape (N, 3)
Reference 3D point cloud.
Y : np.ndarray, shape (N, 3)
Point cloud to align.
Returns
-------
R : np.ndarray, shape (3, 3)
Optimal rotation matrix.
t : np.ndarray, shape (3,)
Optimal translation vector.
"""
# Step 1: Center the point clouds by subtracting the centroids
centroid_X = np.mean(X, axis=0)
centroid_Y = np.mean(Y, axis=0)
X_centered = X - centroid_X
Y_centered = Y - centroid_Y
# Step 2: Compute the covariance matrix
H = np.dot(X_centered.T, Y_centered)
# Step 3: Perform SVD on the covariance matrix
U, _, Vt = np.linalg.svd(H)
# Step 4: Compute the optimal rotation matrix
R = np.dot(Vt.T, U.T)
# Ensure that the rotation matrix is proper (det(R) = 1)
if np.linalg.det(R) < 0:
Vt[2, :] *= -1
R = np.dot(Vt.T, U.T)
# Step 5: Compute the translation vector
t = centroid_X - np.dot(R, centroid_Y)
return R, t
[docs]
def compute_point_cloud(self, camera_position, dense_matches):
"""
Triangulates 3D points from matched 2D correspondences using affine camera models.
Parameters
----------
camera_position : list of dict
List containing intrinsic matrices and rotations for each camera/view.
dense_matches : list
2D point correspondences across image pairs.
Returns
-------
list of list of np.ndarray
Reconstructed 3D point clouds for each image pair.
"""
# iterate over all entries of self.config['selectedPairs']
dense_pcds = []
for j, matches_face in enumerate(dense_matches):
dense_pcds.append([])
for i in range(len(matches_face) - 1):
idx = (i, i + 1)
#centroid = np.mean(dense_matches[i], axis=1)
# intrinsics
k1 = camera_position[j]['intrinsics'][0][idx[0]]
k2 = camera_position[j]['intrinsics'][0][idx[1]]
c = camera_position[j]['intrinsics'][1][0]
s = camera_position[j]['intrinsics'][1][1]
# get truncated rotation matrices
rot1 = camera_position[j]['rotations'][idx[0]][:2]
rot2 = camera_position[j]['rotations'][idx[1]][:2]
# build affine camera matrices
P1 = k1*np.array([[c, 0], [s, 1]]).dot(rot1)
P2 = k2*np.array([[c, 0], [s, 1]]).dot(rot2)
# center matches / no need to further consider the translations
matches1 = matches_face[idx[0]].T - np.mean(matches_face[idx[0]], axis=0)[:, None]
matches2 = matches_face[idx[1]].T - np.mean(matches_face[idx[1]], axis=0)[:, None]
# perform linear triangulation (inhomogeneous equation system)
matches12 = np.vstack((matches1, matches2))
P12 = np.vstack((P1, P2))
X = self.solve_pinv(matches12, P12)
dense_pcds[-1].append(X)
return dense_pcds
[docs]
def solve_pinv(self, b, A):
"""
Solves the inhomogeneous linear system Ax = b using the Moore-Penrose pseudoinverse.
Parameters
----------
b : np.ndarray
Right-hand side matrix (observations).
A : np.ndarray
Design matrix.
Returns
-------
np.ndarray
Solution x such that Ax ≈ b.
"""
A_pinv = np.linalg.pinv(A)
x = A_pinv.dot(b)
return x.T