# Original code from Stefan Toeberg (2023)
# Copyright (c) 2023, Stefan Toeberg.
#
# Modified by Maxime Paschoud (2025)
# Copyright (c) 2025, Maxime Paschoud.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
#
# (http://opensource.org/licenses/BSD-3-Clause)
#
# __author__ = "Stefan Toeberg, LUH: IMR; Maxime Paschoud, ETHZ: CMBM"
#
from pipeline import DAGNode
import numpy as np
[docs]
class EpipolarGeometry(DAGNode):
def __init__(self, numSamples=1000, kappa=2.5, stdFixed=1., searchRange=200, bucketing=False, stdEstimation='adaptive', nSets=1, verbose=False, cost_function='sampson', robust_method="MLESAC"):
"""
Initialize the EpipolarGeometry module for robust fundamental matrix estimation.
Parameters
----------
numSamples : int, optional
Number of fundamental matrix estimations to perform (default is 1000).
kappa : float, optional
Multiplier for standard deviation thresholding (default is 2.5).
stdFixed : float, optional
Fixed standard deviation value if `stdEstimation` is 'fixed' (default is 1.0).
searchRange : int, optional
Search range used for robust cost modeling (default is 200).
bucketing : bool, optional
Whether to use spatial bucketing for match sampling (default is False).
stdEstimation : {'adaptive', 'fixed'}, optional
Method for standard deviation estimation (default is 'adaptive').
nSets : int, optional
Number of top fundamental matrices to keep (default is 1).
verbose : bool, optional
Whether to print intermediate results (default is False).
cost_function : str, optional
Cost function to evaluate match quality ('sampson' or 'symEpi') (default is 'sampson').
robust_method : str, optional
Robust estimation method to use ('RANSAC', 'MSAC', 'LMedS', 'MLESAC') (default is 'MLESAC').
"""
super().__init__()
self.kappa = kappa
self.searchRange = searchRange
self.stdFixed = stdFixed
self.stdEstimation = stdEstimation # adaptive or fixed
self.bucketing = bucketing # Boolean, True to choose the 4 points using bucketing techniques. False: random select
self.numSamples = numSamples # how many Fundamental matrix estimation we perform
self.nSets = nSets # how many best Fundamental matrices and inlier sets we want to consider
self.verbose = verbose # print intermediate results
self.cost_function = cost_function # distance function to evaluate quality of the reprojection
self.robust_method = robust_method # the method to perform how to detect good estimation (RANSAC etc...)
[docs]
def robust_sampson(self, Farr, pts1, pts2):
"""
Compute Sampson distances for each match and each fundamental matrix sample.
Parameters
----------
Farr : ndarray of shape (numSamples, 3, 3)
Fundamental matrices.
pts1 : ndarray of shape (n, 3)
Homogenized points from the first image.
pts2 : ndarray of shape (n, 3)
Homogenized points from the second image.
Returns
-------
sampsonDist : ndarray of shape (numSamples, n)
Computed Sampson distances.
"""
# compute all neccessary terms
a = np.einsum("ij,kjl->kil", pts2, Farr)
a = np.einsum("ijk,jk->ij", a, pts1)
asq = np.square(a)
temp1 = np.einsum("ijk,lk->ijl", Farr, pts1)
temp2 = np.einsum("ijk,lj->ikl", Farr, pts2)
b = np.square(temp1[:, 0, :])
c = np.square(temp1[:, 1, :])
d = np.square(temp2[:, 0, :])
e = np.square(temp2[:, 1, :])
# calculate Sampson distance for each point for each sample
sampsonDist = asq / (b + c + d + e)
return sampsonDist
[docs]
def eval(self, matches):
"""
Perform robust estimation of fundamental matrices for multiple stereo pairs.
Parameters
----------
matches : list of tuples
Each tuple contains matched 2D points (pts1, pts2) for one stereo pair.
Returns
-------
list
A list of dictionaries or a list of lists of dictionaries containing:
- 'pts1', 'pts2': inlier point coordinates
- 'F': estimated fundamental matrix
- 'cost': model cost
- 'std': standard deviation of residuals
"""
# numSamples = self.numSamples
# nSets = self.nSets
# imageSize = np.shape(self.stereoPairs[0][0])[0]
print(f"In epipolar matches len: {len(matches)}")
self.epipolarSets = []
# len(matches) equals number of stereo pairs
# compute robust Fa for every stereo pair
print(f"Epipolar len of matches: {len(matches), len(matches[0])}")
for match in matches:
pts1 = match[0]
pts2 = match[1]
if len(pts1) < 4:
raise Exception('Not enough matches to compute Fa.')
# fSets are the n best sets of the SAC routine
fSets = self.estimate_robust_affine_fundamental_matrix(
pts1, pts2, self.numSamples)
# get the number of inliers
inMax = fSets[0][0]
# estimate the number of numSamples to achieve a probability of 99% for a good set
eps = 1-len(inMax)/len(pts1)
# pGood = 1-(1-(1-eps)**4)**num_samples, underlying formula
pGood = 0.99
ns = np.log(1-pGood)/(np.log(1-(1-eps)**4))
# add a margin of 500 sets to account for degenerate sets (task is not time sensitive)
# and check if more samples are needed
if (ns+500) > self.numSamples:
if np.round(ns) > 10000:
raise Exception('Too much outliers in the detected matches for stereo pair {}.\n'
'Number of computed sample capped to 10000.'.format(i+1))
ns = 10000
else:
ns = np.round(ns)+500
fSets = self.estimate_robust_affine_fundamental_matrix(
pts1, pts2, ns)
nSets = []
for n in range(self.nSets):
epiSet = {}
epiSet['pts1'] = fSets[n][0]
epiSet['pts2'] = fSets[n][1]
epiSet['F'] = fSets[n][2]
epiSet['cost'] = fSets[n][3]
epiSet['std'] = fSets[n][-1]
nSets.append(epiSet)
# in normal mode, when only the best sample is needed, directly append dict
if self.nSets == 1:
self.epipolarSets.append(epiSet)
else:
self.epipolarSets.append(nSets)
"""
if self.verbose == True:
# compute and print the residuals for the best F and the corresponding inliers as the rmse
if self.cost_function == 'sampson':
_, _, rmseResiduals, _ = dist.sampson_distance(
nSets[0]['pts1'], nSets[0]['pts2'], nSets[0]['F'])
elif self.cost_function == 'symEpi':
_, _, rmseResiduals, _ = dist.sym_epi_dist(
nSets[0]['pts1'], nSets[0]['pts2'], nSets[0]['F'])
print('{} / {} ({:.2f}% inliers), '.format(len(inMax),
len(pts1),
(len(inMax)/len(pts1))*10**2),
end='', flush=True)
print('RMSE({}) = {:.4f} px²\n'.format(
self.cost_function, rmseResiduals))
"""
return [self.epipolarSets]
[docs]
def estimate_robust_affine_fundamental_matrix(self, pts1, pts2, numSamples):
"""
Estimate the best fundamental matrices using robust methods.
Parameters
----------
pts1 : ndarray of shape (n, 2)
Points from the first image.
pts2 : ndarray of shape (n, 2)
Points from the second image.
numSamples : int
Number of models to estimate.
Returns
-------
setsIn : list
List of tuples (inPts1, inPts2, F, cost, std) for the best fundamental matrices.
"""
method = self.robust_method
# compute numSamples of models (fundamental matrices)
Farr = self.tensor_affine_fundamental_matrix(
pts1, pts2, numSamples)
if method == "RANSAC":
setsIn = self.ransac(Farr, pts1, pts)
elif method == "MSAC":
setsIn = self.msac(Farr, pts1, pts2)
elif method == "LMedS":
setsIn = self.lmeds(Farr, pts1, pts2)
elif method == "MLESAC":
setsIn = self.mlesac(Farr, pts1, pts2)
return setsIn
[docs]
def tensor_affine_fundamental_matrix(self, pts1, pts2, numSamples):
"""
Generate fundamental matrices using the affine gold standard algorithm.
Parameters
----------
pts1 : ndarray of shape (n, 2)
Points from the first image.
pts2 : ndarray of shape (n, 2)
Points from the second image.
numSamples : int
Number of samples to compute.
Returns
-------
F : ndarray of shape (numSamples, 3, 3)
Fundamental matrices.
"""
# number of points
numPoints = len(pts1)
if self.bucketing == True:
# if enabled, use bucketing technique to select the four matches from which F is computed
points1 = np.empty((numSamples, 4, 2))
points2 = np.empty((numSamples, 4, 2))
for i in range(numSamples):
# select samples of 4 matches, distributed over the image space
#points1[i], points2[i] = utils.bucketing_sac(pts1, pts2, size, 4)
break
else:
# if False, just get 4 random correspondences for each sample
idx = []
for i in range(int(numSamples)):
idx.append(np.random.choice(range(numPoints), 4, replace=False))
idx = np.reshape(np.asarray(idx), (int(numSamples), 4))
points1 = pts1[idx]
points2 = pts2[idx]
# adjust the order of points according to Hartley and Zisserman MVG
finalSets = np.dstack((points2[:, :, 0:2], points1[:, :, 0:2]))
# compute centroids of the point sets in the 3D array
n = np.shape(finalSets)[2]
centroids = np.einsum("ijk->ik", finalSets) / n
centroidsShaped = np.reshape((centroids), (int(numSamples), 1, n))
centroidsShapedTiled = np.tile(centroidsShaped, (1, n, 1))
# solve for F affine
A = finalSets - centroidsShapedTiled
U, s, VT = np.linalg.svd(A)
f = VT[:, 3, :]
# extract values and build F
F = np.zeros((np.shape(f)[0], 3, 3))
F[:, 0, 2] = f[:, 0]
F[:, 1, 2] = f[:, 1]
F[:, 2, 0] = f[:, 2]
F[:, 2, 1] = f[:, 3]
F[:, 2, 2] = -np.einsum("ij,ij->i", f, centroids)
return F
[docs]
def ransac(self, Farr, pts1, pts2):
"""
Estimate the fundamental matrix using RANSAC.
Parameters
----------
Farr : ndarray of shape (numSamples, 3, 3)
Sampled fundamental matrices.
pts1 : ndarray of shape (n, 2)
Points from the first image.
pts2 : ndarray of shape (n, 2)
Points from the second image.
Returns
-------
setsIn : list
List of tuples (inPts1, inPts2, F, cost, std) sorted by inlier count.
"""
nSets = self.nSets
errorArr, std, threshold, _ = self.compute_costs_thresholds(
pts1, pts2, Farr)
# check for which points (inliers) cost function is smaller than threshold
mask = errorArr < threshold
# count number of inliers
trueIndices = np.count_nonzero(mask, 1)
# find the n models (n samples) with highest number of inliers
bestSamples = np.argpartition(trueIndices, -nSets)[-nSets:]
# get indices of inliers corresponding to the best n models
goodIndicesSet = mask[bestSamples]
# inlier sets
inPts1 = []
inPts2 = []
Fin = []
rhoIn = []
# get the best n sets unsorted
for i in range(nSets):
inPts1.append(pts1[goodIndicesSet[i]])
inPts2.append(pts2[goodIndicesSet[i]])
Fin.append(Farr[bestSamples[i]])
rhoIn.append(np.sum(goodIndicesSet[i]))
# sort sets and put them in list
stdArr = np.repeat(std, nSets)
setsIn = sorted(
zip(inPts1, inPts2, Fin, rhoIn, stdArr), key=lambda x: len(x[0]), reverse=True
)
setsIn.append(std)
return setsIn
[docs]
def lmeds(self, Farr, pts1, pts2):
"""
Estimate the fundamental matrix using the Least Median of Squares (LMedS) method.
Parameters
----------
Farr : ndarray of shape (numSamples, 3, 3)
Sampled fundamental matrices.
pts1 : ndarray of shape (n, 2)
Points from the first image.
pts2 : ndarray of shape (n, 2)
Points from the second image.
Returns
-------
setsIn : list
List of tuples (inPts1, inPts2, F, cost, std) sorted by cost.
"""
nSets = self.nSets
# compute costs
errorArr, std, threshold, _ = self.compute_costs_thresholds(
pts1, pts2, Farr)
rho = np.median(errorArr, 1)
# find the nSets best sets (not sorted yet)
bestSets = np.argpartition(rho, nSets)[:nSets]
newMask = errorArr[bestSets] < threshold
# inlier sets
inPts1 = []
inPts2 = []
Fin = []
rhoIn = []
# get the corresponding inliers and fundamental matrices (sets not sorted yet)
for i in range(nSets):
inPts1.append(pts1[newMask[i]])
inPts2.append(pts2[newMask[i]])
Fin.append(Farr[bestSets[i]])
rhoIn.append(rho[bestSets[i]])
# add the estimated standard deviation to every set
stdArr = np.repeat(std, nSets)
# sort the sets and zip the lists
setsIn = sorted(
zip(inPts1, inPts2, Fin, rhoIn, stdArr), key=lambda x: x[3], reverse=False
)
return setsIn
[docs]
def msac(self, Farr, pts1, pts2):
"""
Estimate the fundamental matrix using the MSAC algorithm.
Parameters
----------
Farr : ndarray of shape (numSamples, 3, 3)
Sampled fundamental matrices.
pts1 : ndarray of shape (n, 2)
Points from the first image.
pts2 : ndarray of shape (n, 2)
Points from the second image.
Returns
-------
setsIn : list
List of tuples (inPts1, inPts2, F, cost, std) sorted by cost.
"""
nSets = self.nSets
errorArr, std, threshold, _ = self.compute_costs_thresholds(
pts1, pts2, Farr)
# check for which points (outliers) cost function is greater than threshold
mask = errorArr >= threshold
# compute cost, score of outliers = threshold, score of inliers = cost function
errorArr[mask] = threshold
rho = np.sum(errorArr, 1)
# find the nSets best models with lowest costs
bestSets = np.argpartition(rho, nSets)[:nSets]
newMask = errorArr[bestSets] < threshold
# inlier sets
inPts1 = []
inPts2 = []
Fin = []
rhoIn = []
# get the best n sets unsorted
for i in range(nSets):
inPts1.append(pts1[newMask[i]])
inPts2.append(pts2[newMask[i]])
Fin.append(Farr[bestSets[i]])
rhoIn.append(rho[bestSets[i]])
stdArr = np.repeat(std, nSets)
setsIn = sorted(
zip(inPts1, inPts2, Fin, rhoIn, stdArr), key=lambda x: x[3], reverse=False
)
return setsIn
[docs]
def mlesac(self, Farr, pts1, pts2):
"""
Estimate the fundamental matrix using the MLESAC algorithm.
Parameters
----------
Farr : ndarray of shape (numSamples, 3, 3)
Sampled fundamental matrices.
pts1 : ndarray of shape (n, 2)
Points from the first image.
pts2 : ndarray of shape (n, 2)
Points from the second image.
Returns
-------
setsIn : list
List of tuples (inPts1, inPts2, F, cost, std) sorted by likelihood.
"""
vDist = self.searchRange
nSets = self.nSets
errorArr, std, threshold, _ = self.compute_costs_thresholds(
pts1, pts2, Farr)
numMatches = len(pts1)
# compute the mixture parameter
gamma = self.max_expectation_gamma_robust(
std, errorArr, numMatches, vDist, iters=5)
# compute logarithmic probability function of mixed distributions for each point
Pi = gamma * (1 / (np.sqrt(2 * np.pi) * std)) * \
np.exp(-errorArr / (2 * std**2))
Po = (1 - gamma) / vDist
L = -np.log(Pi + Po)
# compute errorArr function rho, which sums logs
rho = np.sum(L, 1)
bestSets = np.argpartition(rho, nSets)[:nSets]
newMask = errorArr[bestSets] < threshold
# inlier sets
inPts1 = []
inPts2 = []
Fin = []
rhoIn = []
# get the best n sets unsorted
for i in range(nSets):
inPts1.append(pts1[newMask[i]])
inPts2.append(pts2[newMask[i]])
Fin.append(Farr[bestSets[i]])
rhoIn.append(rho[bestSets[i]])
stdArr = np.repeat(std, nSets)
setsIn = sorted(
zip(inPts1, inPts2, Fin, rhoIn, stdArr), key=lambda x: x[3], reverse=False
)
return setsIn
[docs]
def compute_costs_thresholds(self, pts1, pts2, Farr):
"""
Compute the cost values and inlier threshold.
Parameters
----------
pts1 : ndarray of shape (n, 2)
Points from the first image.
pts2 : ndarray of shape (n, 2)
Points from the second image.
Farr : ndarray of shape (numSamples, 3, 3)
Fundamental matrices.
Returns
-------
errorArr : ndarray
Error array (numSamples x numPoints).
std : float
Estimated or fixed standard deviation.
threshold : float
Inlier threshold.
leastMedian : float
Minimum median value across all samples.
"""
cost_function = self.cost_function
numMatches = len(pts1)
# homogenize all points
arrOnes = np.ones((numMatches, 1))
pts1 = np.concatenate((pts1, arrOnes), 1)
pts2 = np.concatenate((pts2, arrOnes), 1)
# compute costs
if cost_function == "symEpi":
errorArr = dist.robust_residual_distance(Farr, pts1, pts2)
else:
errorArr = self.robust_sampson(Farr, pts1, pts2)
# compute medians of cost function
medians = np.median(errorArr, 1)
# find smallest median
idx = np.argmin(medians)
leastMedian = medians[idx]
# robust sigma estimation
std, threshold = self.compute_standard_deviation(
numMatches, leastMedian)
return errorArr, std, threshold, leastMedian
[docs]
def compute_standard_deviation(self, numMatches, leastMedian):
"""
Compute standard deviation and inlier threshold.
Parameters
----------
numMatches : int
Number of matches.
leastMedian : float
Least median of residuals.
Returns
-------
std : float
Estimated or fixed standard deviation.
threshold : float
Threshold for inlier classification.
"""
if self.stdEstimation == "adaptive":
# length of the parameter vector is 4 due to the affine epipolar geometry
lenPv = 4
# formula according to Leroy and Rousseaux
std = 1.4826 * (1 + 5 / (numMatches - lenPv)) * np.sqrt(leastMedian)
threshold = (self.kappa * std) ** 2
else:
std = self.stdFixed
threshold = (self.kappa * std) ** 2
return std, threshold
[docs]
def max_expectation_gamma_robust(self, std, errorArr, numMatches, vDist, iters=5):
"""
Estimate mixture parameter gamma via EM for MLESAC.
Parameters
----------
std : float
Estimated standard deviation.
errorArr : ndarray
Error values (squared distances).
numMatches : int
Number of matches.
vDist : float
Search range (for uniform outlier model).
iters : int, optional
Number of EM iterations (default is 5).
Returns
-------
gamma : ndarray of shape (numSamples, 1)
Mixture parameter gamma values.
"""
# gamma is the start value of the mixture parameter
gamma = 0.5
# expectation maximization algorithm according to Torr: MLESAC
# errorArr is already squared
for j in range(iters):
Pi = (
gamma
* (1 / (np.sqrt(2 * np.pi) * std))
* np.exp(-errorArr / (2 * std**2))
)
Po = (1 - gamma) / vDist
z = Pi / (Pi + Po)
z_s = np.sum(z, 1)
gamma = z_s / numMatches
gamma = np.reshape(gamma, (len(errorArr), 1))
return gamma
[docs]
def single_affine_fundamental_matrix(self, matches):
"""
Estimate a single fundamental matrix from matched points.
Parameters
----------
matches : ndarray of shape (n, 4)
Matched keypoints or points between two images. Each element of the array represents one corresponding point (x1, y1, x2, y2)
Returns
-------
F : ndarray of shape (3, 3)
The estimated affine fundamental matrix
"""
# adjust the order of points as defined by Hartley and Zisserman
X = matches[:, [2, 3, 0, 1]]
# compute the centroid
n = len(X)
centroid = np.sum(X, axis=0) / n
# compute the (n x 4) matrix A of the centered matches
A = X - np.tile(centroid, (n, 1))
# the solution vector is given by the singular vector corresponding to the smallest
# singular value of matrix A which is the last row of matrix VT
U, s, VT = np.linalg.svd(A)
f = VT[-1, :]
# reshape values to obtain the affine fundamental matrix accoring to Hartley and Zisserman MVG
F = np.zeros((3, 3))
F[0, 2] = f[0]
F[1, 2] = f[1]
F[2, 0] = f[2]
F[2, 1] = f[3]
F[2, 2] = -np.dot(f, centroid)
return F