# Original code from Stefan Toeberg (2023)
# Copyright (c) 2023, Stefan Toeberg.
#
# Modified by Maxime Paschoud (2025)
# Copyright (c) 2025, Maxime Paschoud.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
#
# (http://opensource.org/licenses/BSD-3-Clause)
#
# __author__ = "Stefan Toeberg, LUH: IMR; Maxime Paschoud, ETHZ"
#
from pipeline import DAGNode
import numpy as np
[docs]
class OptimizeEpipolarGeometry(DAGNode):
"""
Optimize epipolar geometry using local refinement methods.
This class performs iterative local optimization of fundamental matrices
and inliers obtained from robust estimation schemes (e.g., RANSAC, LMedS, MSAC, MLESAC).
Different cost functions (Sampson, symmetric epipolar) and robust methods
are supported.
Parameters
----------
verbose : bool, optional
If True, prints diagnostic information during optimization. Default is False.
costFunction : {'sampson', 'symEpi'}, optional
Cost function to use for evaluating epipolar geometry. Default is 'sampson'.
kappa : float, optional
Scaling factor for computing inlier thresholds. Default is 2.5.
searchRange : int, optional
Search range (in pixels) for disparity when evaluating guided matching. Default is 200.
robustMethod : {'LMedS', 'RANSAC', 'MSAC', 'MLESAC'}, optional
Robust estimation method for local optimization. Default is 'MLESAC'.
stdFixed : float, optional
Fixed value of the standard deviation used when `stdEstimation='fixed'`. Default is 1.0.
stdEstimation : {'adaptive', 'fixed'}, optional
Method to estimate standard deviation. If 'adaptive', a robust estimate
is computed per iteration; if 'fixed', uses `stdFixed`. Default is 'adaptive'.
"""
def __init__(self, verbose=False, costFunction='sampson', kappa=2.5, searchRange=200, robustMethod='MLESAC', stdFixed=1., stdEstimation='adaptive'):
super().__init__()
self.robustMethod = robustMethod
self.costFunction = costFunction # sampson
self.verbose = verbose
self.kappa = kappa
self.searchRange=searchRange
self.stdFixed = stdFixed
self.stdEstimation = stdEstimation
[docs]
def eval(self, epipolarSets, matches):
"""
Perform iterative local optimization over inlier matches.
Parameters
----------
epipolarSets : list of dict
List of dictionaries containing the fundamental matrix (`'F'`) and
corresponding inliers for each stereo pair, as estimated by a SAC scheme.
matches : list of dict
List of dictionaries containing matched points for each stereo pair.
Returns
-------
list of dict
Updated `epipolarSets` with optimized fundamental matrices and refined inliers.
"""
for j, epiSet in enumerate(epipolarSets):
npts1i, npts2i, Fi, i, rhoi, sigi = self.local_opt_affine(
matches[j], epiSet)
# select best result yielding the lowest cost/best score, perform local opt at least once
if self.robustMethod == 'RANSAC':
idxBestScore = np.argmax(rhoi[1:])+1
else:
idxBestScore = np.argmin(rhoi[1:])+1
# update the fundamental matrix and corresponding inliers
epiSet['F'] = Fi[idxBestScore]
epiSet['pts1'] = npts1i[idxBestScore]
epiSet['pts2'] = npts2i[idxBestScore]
"""
if self.verbose == True:
# compute and print the rmse errorArr of the chosen cost computed for all inliers
if self.costFunction == 'sampson':
_, _, rmseResiduals, _ = dist.sampson_distance(
epiSet['pts1'], epiSet['pts2'], epiSet['F'])
elif self.costFunction == 'symEpi':
_, _, rmseResiduals, _ = dist.sym_epi_dist(
epiSet['pts1'], epiSet['pts2'], epiSet['F'])
print('iters = {}, score/cost - {}: {:.4f} -> GM: {:.4f};\n'.format(idxBestScore,
self.robustMethod, rhoi[0], rhoi[idxBestScore]), end='', flush=True)
print('{:<24} RMSE({}) = {:.4f} px², {} matches'.format(
' ', self.costFunction, rmseResiduals, len(epiSet['pts1'])))
"""
self.epipolarSets = epipolarSets
return [epipolarSets]
[docs]
def local_opt_affine(self, matches, epiSet):
"""
Perform iterative local optimization of the best set yielded by the SAC scheme.
Guided matching is performed using the updated epipolar geometry until
convergence or a stopping criterion is reached.
Parameters
----------
matches : dict
Dictionary containing the matches in the stereo pair.
epiSet : dict
Dictionary containing the initial fundamental matrix (`'F'`) and
corresponding inliers from the SAC scheme.
Returns
-------
npts1i : list of ndarray
List of inlier points from image 1 across iterations.
npts2i : list of ndarray
List of inlier points from image 2 across iterations.
Fi : list of ndarray
Fundamental matrices estimated across iterations.
i : int
Number of iterations performed.
rho : list
Cost values across iterations (definition depends on chosen method).
stdi : list of float
Standard deviations of residuals at each iteration.
"""
if self.robustMethod == "LMedS":
return local_opt_lmeds(matches, epiSet)
elif self.robustMethod == "RANSAC":
return local_opt_ransac(matches, epiSet)
elif self.robustMethod == "MSAC":
return local_opt_msac(matches, epiSet)
elif self.robustMethod == "MLESAC":
return self.local_opt_mlesac(matches, epiSet)
else:
return self.local_opt_mlesac(matches, epiSet)
[docs]
def local_opt_lmeds(self, matches, epiSet):
"""
Local optimization using Least Median of Squares (LMedS).
Parameters
----------
matches : dict
Dictionary containing the matches in the stereo pair.
epiSet : dict
Dictionary containing the initial fundamental matrix and inliers.
Returns
-------
npts1i : list of ndarray
Inlier points in image 1 per iteration.
npts2i : list of ndarray
Inlier points in image 2 per iteration.
Fi : list of ndarray
Fundamental matrices estimated per iteration.
i : int
Number of iterations performed.
mediani : list of float
Median residuals across iterations.
stdi : list of float
Standard deviations estimated per iteration.
"""
costFunction = self.costFunction
stdi = []
stdi.append(epiSet["std"])
i = 0
Fi = []
npts1i = []
npts2i = []
mediani = []
# append the initial correspondences and fundamental matrix
F0 = epiSet["F"]
pts1 = epiSet["pts1"]
pts2 = epiSet["pts2"]
m1 = matches["pts1"]
m2 = matches["pts2"]
Fi.append(F0)
npts1i.append(pts1[:, :2])
npts2i.append(pts2[:, :2])
errorArr = dist.geometric_distance(
m1, m2, F0, costFunction, score="single")
leastMedian = np.median(errorArr)
mediani.append(leastMedian)
while True:
pp12 = np.hstack((npts1i[i][:, 0:2], npts2i[i][:, 0:2]))
Fglob = self.single_affine_fundamental_matrix(pp12)
# compute residuals
errorArr = dist.geometric_distance(
m1, m2, Fglob, costFunction, score="single")
# compute LMedS
leastMedian = np.median(errorArr)
numMatches = len(m1)
# compute standard deviation
std, threshold = self.compute_standard_deviation(
numMatches, leastMedian)
# determine inliers and compute median
mask = errorArr < threshold
npts1 = m1[mask]
npts2 = m2[mask]
mediani.append(leastMedian)
# break condition (after five iterations)
if leastMedian >= mediani[i] and (i > 4):
Fi.append(Fglob)
npts1i.append(npts1)
npts2i.append(npts2)
stdi.append(std)
i += 1
break
else:
Fi.append(Fglob)
npts1i.append(npts1)
npts2i.append(npts2)
stdi.append(std)
i += 1
return npts1i, npts2i, Fi, i, mediani, stdi
[docs]
def local_opt_ransac(self, matches, epiSet):
"""
Local optimization using RANSAC.
Parameters
----------
matches : dict
Dictionary containing the matches in the stereo pair.
epiSet : dict
Dictionary containing the initial fundamental matrix and inliers.
Returns
-------
npts1i : list of ndarray
Inlier points in image 1 per iteration.
npts2i : list of ndarray
Inlier points in image 2 per iteration.
Fi : list of ndarray
Fundamental matrices estimated per iteration.
i : int
Number of iterations performed.
lenPts : list of int
Number of inliers across iterations.
stdi : list of float
Standard deviations estimated per iteration.
"""
costFunction = self.costFunction
stdi = []
stdi.append(epiSet["std"])
i = 0
Fi = []
npts1i = []
npts2i = []
lenPts = []
# append the initial correspondences and fundamental matrix
F0 = epiSet["F"]
pts1 = epiSet["pts1"]
pts2 = epiSet["pts2"]
m1 = matches["pts1"]
m2 = matches["pts2"]
Fi.append(F0)
npts1i.append(pts1[:, :2])
npts2i.append(pts2[:, :2])
lenPts.append(len(pts1))
while True:
leni = len(npts1i[i])
pp12 = np.hstack((npts1i[i][:, 0:2], npts2i[i][:, 0:2]))
Fglob = self.single_affine_fundamental_matrix(pp12)
errorArr = dist.geometric_distance(
m1, m2, Fglob, costFunction, score="single")
# compute LMedS and number of matches
leastMedian = np.median(errorArr)
numMatches = len(m1)
# compute standard deviation
std, threshold = self.compute_standard_deviation(
numMatches, leastMedian)
# determine inliers and get cost
mask = errorArr < threshold
npts1 = m1[mask]
npts2 = m2[mask]
lenNew = len(npts1)
lenPts.append(lenNew)
# break condition (after five iterations)
if lenNew <= leni and i > 4:
Fi.append(Fglob)
npts1i.append(npts1)
npts2i.append(npts2)
stdi.append(std)
i += 1
break
else:
Fi.append(Fglob)
npts1i.append(npts1)
npts2i.append(npts2)
stdi.append(std)
i += 1
return npts1i, npts2i, Fi, i, lenPts, stdi
[docs]
def local_opt_msac(self, matches, epiSet):
"""
Local optimization using MSAC.
Parameters
----------
matches : dict
Dictionary containing the matches in the stereo pair.
epiSet : dict
Dictionary containing the initial fundamental matrix and inliers.
Returns
-------
npts1i : list of ndarray
Inlier points in image 1 per iteration.
npts2i : list of ndarray
Inlier points in image 2 per iteration.
Fi : list of ndarray
Fundamental matrices estimated per iteration.
i : int
Number of iterations performed.
rhoi : list of float
Cost values computed per iteration.
stdi : list of float
Standard deviations estimated per iteration.
"""
costFunction = self.costFunction
Tfac = self.kappa
stdi = []
stdi.append(epiSet["std"])
i = 0
Fi = []
npts1i = []
npts2i = []
rhoi = []
# append the initial correspondences and fundamental matrix
F0 = epiSet["F"]
pts1 = epiSet["pts1"]
pts2 = epiSet["pts2"]
m1 = matches["pts1"]
m2 = matches["pts2"]
# compute initial cost
errorArr = dist.geometric_distance(
pts1, pts2, F0, costFunction, score="single")
rho0 = np.sum(errorArr) + (len(m1) - len(pts1)) * (Tfac * stdi[0]) ** 2
Fi.append(F0)
npts1i.append(pts1[:, :2])
npts2i.append(pts2[:, :2])
rhoi.append(rho0)
while True:
# compute Fa with all correspondences involved
pp12 = np.hstack((npts1i[i][:, 0:2], npts2i[i][:, 0:2]))
Fnew = self.single_affine_fundamental_matrix(pp12)
# compute error distances
errorArr = dist.geometric_distance(
m1, m2, Fnew, costFunction, score="single")
# compute LMedS and number of matches
leastMedian = np.median(errorArr)
numMatches = len(m1)
# compute standard deviation
std, threshold = self.compute_standard_deviation(
numMatches, leastMedian)
# compute cost function rho for each point:
# cost of outliers = threshold, cost of inliers = errorArr
mask = errorArr >= threshold
errorArr[mask] = threshold
rhoNew = np.sum(errorArr)
# find inliers
newMask = errorArr < threshold
npts1 = m1[newMask]
npts2 = m2[newMask]
rhoi.append(rhoNew)
# break condition (after five iterations)
if rhoNew >= rhoi[i] and i > 4:
Fi.append(Fnew)
npts1i.append(npts1)
npts2i.append(npts2)
stdi.append(std)
i += 1
break
else:
Fi.append(Fnew)
npts1i.append(npts1)
npts2i.append(npts2)
stdi.append(std)
i += 1
return npts1i, npts2i, Fi, i, rhoi, stdi
[docs]
def local_opt_mlesac(self, matches, epiSet):
"""
Local optimization using MLESAC.
Parameters
----------
matches : dict
Dictionary containing the matches in the stereo pair.
epiSet : dict
Dictionary containing the initial fundamental matrix and inliers.
Returns
-------
npts1i : list of ndarray
Inlier points in image 1 per iteration.
npts2i : list of ndarray
Inlier points in image 2 per iteration.
Fi : list of ndarray
Fundamental matrices estimated per iteration.
i : int
Number of iterations performed.
rhoi : list of float
Cost values computed per iteration.
stdi : list of float
Standard deviations estimated per iteration.
"""
costFunction = self.costFunction
vDist = self.searchRange
stdi = []
stdi.append(epiSet["std"])
i = 0
i = 0
Fi = []
npts1i = []
npts2i = []
rhoi = []
# append the initial correspondences and fundamental matrix
F0 = epiSet["F"]
pts1 = epiSet["pts1"]
pts2 = epiSet["pts2"]
m1 = matches[0]
m2 = matches[1]
std = stdi[0]
# compute residuals
errorArr = self.geometric_distance(
m1, m2, F0, costFunction, score="single")
# number of matches
numMatches = len(m1)
# maximization expectation algorithm (5 iterations)
gamma = self.max_expectation_gamma(std, errorArr, numMatches, vDist, iters=5)
# compute logarithmic probability function of mixed distributions for each point
Pi = gamma * (1 / (np.sqrt(2 * np.pi) * std)) * \
np.exp(-errorArr / (2 * std**2))
Po = (1 - gamma) / vDist
L = -np.log(Pi + Po)
# compute the cost function as summed logs
rho0 = np.sum(L)
Fi.append(F0)
npts1i.append(pts1[:, :2])
npts2i.append(pts2[:, :2])
rhoi.append(rho0)
while True:
# compute Fa with all correspondences involved
pp12 = np.hstack((npts1i[i][:, 0:2], npts2i[i][:, 0:2]))
Fnew = self.single_affine_fundamental_matrix(pp12)
# compute residuals
errorArr = self.geometric_distance(
m1, m2, Fnew, costFunction, score="single")
# compute LMedS and number of matches
leastMedian = np.median(errorArr)
numMatches = len(m1)
# compute standard deviation
std, threshold = self.compute_standard_deviation(
numMatches, leastMedian)
gamma = self.max_expectation_gamma(
std, errorArr, numMatches, vDist, iters=5)
# compute logarithmic probability function of mixed distributions for each point
Pi = (gamma * (1 / (np.sqrt(2 * np.pi) * std))
* np.exp(-errorArr / (2 * std**2)))
Po = (1 - gamma) / vDist
# natural logarithm
L = -np.log(Pi + Po)
# determine inliers
mask = errorArr < threshold
# compute cost
rhoNew = np.sum(L)
npts1 = m1[mask]
npts2 = m2[mask]
rhoi.append(rhoNew)
# break condition (after five iterations)
if rhoNew >= rhoi[i] and i > 4:
Fi.append(Fnew)
npts1i.append(npts1)
npts2i.append(npts2)
stdi.append(std)
i += 1
break
else:
Fi.append(Fnew)
npts1i.append(npts1)
npts2i.append(npts2)
stdi.append(std)
i += 1
return npts1i, npts2i, Fi, i, rhoi, stdi
[docs]
def max_expectation_gamma(self, std, errorArr, numMatches, vDist, iters=5):
"""
Compute the mixture parameter gamma via the expectation-maximization algorithm.
Parameters
----------
std : float
Standard deviation of the residuals.
errorArr : ndarray
Array of squared error values.
numMatches : int
Number of matches.
vDist : float
Parameter defined by the disparity search range.
iters : int, optional
Number of EM iterations. Default is 5.
Returns
-------
gamma : float
Estimated mixture parameter.
"""
# gamma is the start value of the mixture parameter
gamma = 0.5
# expectation maximization algorithm according to Torr: MLESAC
# errorArr is already squared
for j in range(iters):
Pi = (
gamma
* (1 / (np.sqrt(2 * np.pi) * std))
* np.exp(-errorArr / (2 * std**2))
)
Po = (1 - gamma) / vDist
z = Pi / (Pi + Po)
z_s = np.sum(z)
gamma = z_s / numMatches
return gamma
[docs]
def compute_standard_deviation(self, numMatches, leastMedian):
"""
Compute robust or fixed estimate of the standard deviation.
Parameters
----------
numMatches : int
Number of matches.
leastMedian : float
Least median of residuals.
Returns
-------
std : float
Estimated or fixed standard deviation.
threshold : float
Threshold computed as ``(kappa * std) ** 2``.
"""
if self.stdEstimation == "adaptive":
# length of the parameter vector is 4 due to the affine epipolar geometry
lenPv = 4
# formula according to Leroy and Rousseaux
std = 1.4826 * (1 + 5 / (numMatches - lenPv)) * np.sqrt(leastMedian)
threshold = (self.kappa * std) ** 2
else:
std = self.stdFixed
threshold = (self.kappa * std) ** 2
return std, threshold
[docs]
def single_affine_fundamental_matrix(self, matches):
"""
Estimate an affine fundamental matrix using the affine gold standard algorithm.
Parameters
----------
matches : ndarray of shape (n, 4)
Array of point correspondences with format (x1, y1, x2, y2).
Returns
-------
F : ndarray of shape (3, 3)
Estimated affine fundamental matrix.
"""
# adjust the order of points as defined by Hartley and Zisserman
X = matches[:, [2, 3, 0, 1]]
# compute the centroid
n = len(X)
centroid = np.sum(X, axis=0) / n
# compute the (n x 4) matrix A of the centered matches
A = X - np.tile(centroid, (n, 1))
# the solution vector is given by the singular vector corresponding to the smallest
# singular value of matrix A which is the last row of matrix VT
U, s, VT = np.linalg.svd(A)
f = VT[-1, :]
# reshape values to obtain the affine fundamental matrix accoring to Hartley and Zisserman MVG
F = np.zeros((3, 3))
F[0, 2] = f[0]
F[1, 2] = f[1]
F[2, 0] = f[2]
F[2, 1] = f[3]
F[2, 2] = -np.dot(f, centroid)
return F
[docs]
def geometric_distance(self, pts1, pts2, F, costFunction, score):
"""
Compute geometric distance between corresponding points under a fundamental matrix.
Parameters
----------
pts1 : ndarray of shape (n, 2) or (n, 3)
Points from the first image.
pts2 : ndarray of shape (n, 2) or (n, 3)
Points from the second image.
F : ndarray of shape (3, 3)
Fundamental matrix.
costFunction : {'sampson', 'symEpi'}
Cost function to compute residuals.
score : {'sum', 'single'}
Whether to return the sum of distances or per-point distances.
Returns
-------
distance : float or ndarray
Geometric distance(s) depending on `score`.
"""
if len(pts1[0]) == 2:
length = len(pts1)
ON = np.ones((length, 1))
pts1 = np.concatenate((pts1, ON), 1)
pts2 = np.concatenate((pts2, ON), 1)
npts1 = np.einsum("ij->ji", pts1)
npts2 = np.einsum("ij->ji", pts2)
a = np.einsum("ji,jk->ik", npts2, F)
a = np.einsum("ij,ji->i", a, npts1)
a = np.square(a)
temp1 = np.einsum("ij,jk->ik", F, npts1)
temp2 = np.einsum("ji,jk->ik", F, npts2)
b = np.square(temp1[0])
c = np.square(temp1[1])
d = np.square(temp2[0])
e = np.square(temp2[1])
if self.costFunction == "sampson": # Sampson
geoDist = a / (b + c + d + e)
elif self.costFunction == "symEpi": # symmetric epipolar distance
geoDist = a * (1 / (b + c) + 1 / (d + e))
if score == "sum":
distance = np.einsum("i->", geoDist)
elif score == "single":
distance = geoDist
return distance