Source code for miop.optimize_epipolar_geometry

# Original code from Stefan Toeberg (2023)
# Copyright (c) 2023, Stefan Toeberg.
#
# Modified by Maxime Paschoud (2025)
# Copyright (c) 2025, Maxime Paschoud.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
#
# (http://opensource.org/licenses/BSD-3-Clause)
#
# __author__ = "Stefan Toeberg, LUH: IMR; Maxime Paschoud, ETHZ"
#  

from pipeline import DAGNode
import numpy as np


[docs] class OptimizeEpipolarGeometry(DAGNode): """ Optimize epipolar geometry using local refinement methods. This class performs iterative local optimization of fundamental matrices and inliers obtained from robust estimation schemes (e.g., RANSAC, LMedS, MSAC, MLESAC). Different cost functions (Sampson, symmetric epipolar) and robust methods are supported. Parameters ---------- verbose : bool, optional If True, prints diagnostic information during optimization. Default is False. costFunction : {'sampson', 'symEpi'}, optional Cost function to use for evaluating epipolar geometry. Default is 'sampson'. kappa : float, optional Scaling factor for computing inlier thresholds. Default is 2.5. searchRange : int, optional Search range (in pixels) for disparity when evaluating guided matching. Default is 200. robustMethod : {'LMedS', 'RANSAC', 'MSAC', 'MLESAC'}, optional Robust estimation method for local optimization. Default is 'MLESAC'. stdFixed : float, optional Fixed value of the standard deviation used when `stdEstimation='fixed'`. Default is 1.0. stdEstimation : {'adaptive', 'fixed'}, optional Method to estimate standard deviation. If 'adaptive', a robust estimate is computed per iteration; if 'fixed', uses `stdFixed`. Default is 'adaptive'. """ def __init__(self, verbose=False, costFunction='sampson', kappa=2.5, searchRange=200, robustMethod='MLESAC', stdFixed=1., stdEstimation='adaptive'): super().__init__() self.robustMethod = robustMethod self.costFunction = costFunction # sampson self.verbose = verbose self.kappa = kappa self.searchRange=searchRange self.stdFixed = stdFixed self.stdEstimation = stdEstimation
[docs] def eval(self, epipolarSets, matches): """ Perform iterative local optimization over inlier matches. Parameters ---------- epipolarSets : list of dict List of dictionaries containing the fundamental matrix (`'F'`) and corresponding inliers for each stereo pair, as estimated by a SAC scheme. matches : list of dict List of dictionaries containing matched points for each stereo pair. Returns ------- list of dict Updated `epipolarSets` with optimized fundamental matrices and refined inliers. """ for j, epiSet in enumerate(epipolarSets): npts1i, npts2i, Fi, i, rhoi, sigi = self.local_opt_affine( matches[j], epiSet) # select best result yielding the lowest cost/best score, perform local opt at least once if self.robustMethod == 'RANSAC': idxBestScore = np.argmax(rhoi[1:])+1 else: idxBestScore = np.argmin(rhoi[1:])+1 # update the fundamental matrix and corresponding inliers epiSet['F'] = Fi[idxBestScore] epiSet['pts1'] = npts1i[idxBestScore] epiSet['pts2'] = npts2i[idxBestScore] """ if self.verbose == True: # compute and print the rmse errorArr of the chosen cost computed for all inliers if self.costFunction == 'sampson': _, _, rmseResiduals, _ = dist.sampson_distance( epiSet['pts1'], epiSet['pts2'], epiSet['F']) elif self.costFunction == 'symEpi': _, _, rmseResiduals, _ = dist.sym_epi_dist( epiSet['pts1'], epiSet['pts2'], epiSet['F']) print('iters = {}, score/cost - {}: {:.4f} -> GM: {:.4f};\n'.format(idxBestScore, self.robustMethod, rhoi[0], rhoi[idxBestScore]), end='', flush=True) print('{:<24} RMSE({}) = {:.4f} px², {} matches'.format( ' ', self.costFunction, rmseResiduals, len(epiSet['pts1']))) """ self.epipolarSets = epipolarSets return [epipolarSets]
[docs] def local_opt_affine(self, matches, epiSet): """ Perform iterative local optimization of the best set yielded by the SAC scheme. Guided matching is performed using the updated epipolar geometry until convergence or a stopping criterion is reached. Parameters ---------- matches : dict Dictionary containing the matches in the stereo pair. epiSet : dict Dictionary containing the initial fundamental matrix (`'F'`) and corresponding inliers from the SAC scheme. Returns ------- npts1i : list of ndarray List of inlier points from image 1 across iterations. npts2i : list of ndarray List of inlier points from image 2 across iterations. Fi : list of ndarray Fundamental matrices estimated across iterations. i : int Number of iterations performed. rho : list Cost values across iterations (definition depends on chosen method). stdi : list of float Standard deviations of residuals at each iteration. """ if self.robustMethod == "LMedS": return local_opt_lmeds(matches, epiSet) elif self.robustMethod == "RANSAC": return local_opt_ransac(matches, epiSet) elif self.robustMethod == "MSAC": return local_opt_msac(matches, epiSet) elif self.robustMethod == "MLESAC": return self.local_opt_mlesac(matches, epiSet) else: return self.local_opt_mlesac(matches, epiSet)
[docs] def local_opt_lmeds(self, matches, epiSet): """ Local optimization using Least Median of Squares (LMedS). Parameters ---------- matches : dict Dictionary containing the matches in the stereo pair. epiSet : dict Dictionary containing the initial fundamental matrix and inliers. Returns ------- npts1i : list of ndarray Inlier points in image 1 per iteration. npts2i : list of ndarray Inlier points in image 2 per iteration. Fi : list of ndarray Fundamental matrices estimated per iteration. i : int Number of iterations performed. mediani : list of float Median residuals across iterations. stdi : list of float Standard deviations estimated per iteration. """ costFunction = self.costFunction stdi = [] stdi.append(epiSet["std"]) i = 0 Fi = [] npts1i = [] npts2i = [] mediani = [] # append the initial correspondences and fundamental matrix F0 = epiSet["F"] pts1 = epiSet["pts1"] pts2 = epiSet["pts2"] m1 = matches["pts1"] m2 = matches["pts2"] Fi.append(F0) npts1i.append(pts1[:, :2]) npts2i.append(pts2[:, :2]) errorArr = dist.geometric_distance( m1, m2, F0, costFunction, score="single") leastMedian = np.median(errorArr) mediani.append(leastMedian) while True: pp12 = np.hstack((npts1i[i][:, 0:2], npts2i[i][:, 0:2])) Fglob = self.single_affine_fundamental_matrix(pp12) # compute residuals errorArr = dist.geometric_distance( m1, m2, Fglob, costFunction, score="single") # compute LMedS leastMedian = np.median(errorArr) numMatches = len(m1) # compute standard deviation std, threshold = self.compute_standard_deviation( numMatches, leastMedian) # determine inliers and compute median mask = errorArr < threshold npts1 = m1[mask] npts2 = m2[mask] mediani.append(leastMedian) # break condition (after five iterations) if leastMedian >= mediani[i] and (i > 4): Fi.append(Fglob) npts1i.append(npts1) npts2i.append(npts2) stdi.append(std) i += 1 break else: Fi.append(Fglob) npts1i.append(npts1) npts2i.append(npts2) stdi.append(std) i += 1 return npts1i, npts2i, Fi, i, mediani, stdi
[docs] def local_opt_ransac(self, matches, epiSet): """ Local optimization using RANSAC. Parameters ---------- matches : dict Dictionary containing the matches in the stereo pair. epiSet : dict Dictionary containing the initial fundamental matrix and inliers. Returns ------- npts1i : list of ndarray Inlier points in image 1 per iteration. npts2i : list of ndarray Inlier points in image 2 per iteration. Fi : list of ndarray Fundamental matrices estimated per iteration. i : int Number of iterations performed. lenPts : list of int Number of inliers across iterations. stdi : list of float Standard deviations estimated per iteration. """ costFunction = self.costFunction stdi = [] stdi.append(epiSet["std"]) i = 0 Fi = [] npts1i = [] npts2i = [] lenPts = [] # append the initial correspondences and fundamental matrix F0 = epiSet["F"] pts1 = epiSet["pts1"] pts2 = epiSet["pts2"] m1 = matches["pts1"] m2 = matches["pts2"] Fi.append(F0) npts1i.append(pts1[:, :2]) npts2i.append(pts2[:, :2]) lenPts.append(len(pts1)) while True: leni = len(npts1i[i]) pp12 = np.hstack((npts1i[i][:, 0:2], npts2i[i][:, 0:2])) Fglob = self.single_affine_fundamental_matrix(pp12) errorArr = dist.geometric_distance( m1, m2, Fglob, costFunction, score="single") # compute LMedS and number of matches leastMedian = np.median(errorArr) numMatches = len(m1) # compute standard deviation std, threshold = self.compute_standard_deviation( numMatches, leastMedian) # determine inliers and get cost mask = errorArr < threshold npts1 = m1[mask] npts2 = m2[mask] lenNew = len(npts1) lenPts.append(lenNew) # break condition (after five iterations) if lenNew <= leni and i > 4: Fi.append(Fglob) npts1i.append(npts1) npts2i.append(npts2) stdi.append(std) i += 1 break else: Fi.append(Fglob) npts1i.append(npts1) npts2i.append(npts2) stdi.append(std) i += 1 return npts1i, npts2i, Fi, i, lenPts, stdi
[docs] def local_opt_msac(self, matches, epiSet): """ Local optimization using MSAC. Parameters ---------- matches : dict Dictionary containing the matches in the stereo pair. epiSet : dict Dictionary containing the initial fundamental matrix and inliers. Returns ------- npts1i : list of ndarray Inlier points in image 1 per iteration. npts2i : list of ndarray Inlier points in image 2 per iteration. Fi : list of ndarray Fundamental matrices estimated per iteration. i : int Number of iterations performed. rhoi : list of float Cost values computed per iteration. stdi : list of float Standard deviations estimated per iteration. """ costFunction = self.costFunction Tfac = self.kappa stdi = [] stdi.append(epiSet["std"]) i = 0 Fi = [] npts1i = [] npts2i = [] rhoi = [] # append the initial correspondences and fundamental matrix F0 = epiSet["F"] pts1 = epiSet["pts1"] pts2 = epiSet["pts2"] m1 = matches["pts1"] m2 = matches["pts2"] # compute initial cost errorArr = dist.geometric_distance( pts1, pts2, F0, costFunction, score="single") rho0 = np.sum(errorArr) + (len(m1) - len(pts1)) * (Tfac * stdi[0]) ** 2 Fi.append(F0) npts1i.append(pts1[:, :2]) npts2i.append(pts2[:, :2]) rhoi.append(rho0) while True: # compute Fa with all correspondences involved pp12 = np.hstack((npts1i[i][:, 0:2], npts2i[i][:, 0:2])) Fnew = self.single_affine_fundamental_matrix(pp12) # compute error distances errorArr = dist.geometric_distance( m1, m2, Fnew, costFunction, score="single") # compute LMedS and number of matches leastMedian = np.median(errorArr) numMatches = len(m1) # compute standard deviation std, threshold = self.compute_standard_deviation( numMatches, leastMedian) # compute cost function rho for each point: # cost of outliers = threshold, cost of inliers = errorArr mask = errorArr >= threshold errorArr[mask] = threshold rhoNew = np.sum(errorArr) # find inliers newMask = errorArr < threshold npts1 = m1[newMask] npts2 = m2[newMask] rhoi.append(rhoNew) # break condition (after five iterations) if rhoNew >= rhoi[i] and i > 4: Fi.append(Fnew) npts1i.append(npts1) npts2i.append(npts2) stdi.append(std) i += 1 break else: Fi.append(Fnew) npts1i.append(npts1) npts2i.append(npts2) stdi.append(std) i += 1 return npts1i, npts2i, Fi, i, rhoi, stdi
[docs] def local_opt_mlesac(self, matches, epiSet): """ Local optimization using MLESAC. Parameters ---------- matches : dict Dictionary containing the matches in the stereo pair. epiSet : dict Dictionary containing the initial fundamental matrix and inliers. Returns ------- npts1i : list of ndarray Inlier points in image 1 per iteration. npts2i : list of ndarray Inlier points in image 2 per iteration. Fi : list of ndarray Fundamental matrices estimated per iteration. i : int Number of iterations performed. rhoi : list of float Cost values computed per iteration. stdi : list of float Standard deviations estimated per iteration. """ costFunction = self.costFunction vDist = self.searchRange stdi = [] stdi.append(epiSet["std"]) i = 0 i = 0 Fi = [] npts1i = [] npts2i = [] rhoi = [] # append the initial correspondences and fundamental matrix F0 = epiSet["F"] pts1 = epiSet["pts1"] pts2 = epiSet["pts2"] m1 = matches[0] m2 = matches[1] std = stdi[0] # compute residuals errorArr = self.geometric_distance( m1, m2, F0, costFunction, score="single") # number of matches numMatches = len(m1) # maximization expectation algorithm (5 iterations) gamma = self.max_expectation_gamma(std, errorArr, numMatches, vDist, iters=5) # compute logarithmic probability function of mixed distributions for each point Pi = gamma * (1 / (np.sqrt(2 * np.pi) * std)) * \ np.exp(-errorArr / (2 * std**2)) Po = (1 - gamma) / vDist L = -np.log(Pi + Po) # compute the cost function as summed logs rho0 = np.sum(L) Fi.append(F0) npts1i.append(pts1[:, :2]) npts2i.append(pts2[:, :2]) rhoi.append(rho0) while True: # compute Fa with all correspondences involved pp12 = np.hstack((npts1i[i][:, 0:2], npts2i[i][:, 0:2])) Fnew = self.single_affine_fundamental_matrix(pp12) # compute residuals errorArr = self.geometric_distance( m1, m2, Fnew, costFunction, score="single") # compute LMedS and number of matches leastMedian = np.median(errorArr) numMatches = len(m1) # compute standard deviation std, threshold = self.compute_standard_deviation( numMatches, leastMedian) gamma = self.max_expectation_gamma( std, errorArr, numMatches, vDist, iters=5) # compute logarithmic probability function of mixed distributions for each point Pi = (gamma * (1 / (np.sqrt(2 * np.pi) * std)) * np.exp(-errorArr / (2 * std**2))) Po = (1 - gamma) / vDist # natural logarithm L = -np.log(Pi + Po) # determine inliers mask = errorArr < threshold # compute cost rhoNew = np.sum(L) npts1 = m1[mask] npts2 = m2[mask] rhoi.append(rhoNew) # break condition (after five iterations) if rhoNew >= rhoi[i] and i > 4: Fi.append(Fnew) npts1i.append(npts1) npts2i.append(npts2) stdi.append(std) i += 1 break else: Fi.append(Fnew) npts1i.append(npts1) npts2i.append(npts2) stdi.append(std) i += 1 return npts1i, npts2i, Fi, i, rhoi, stdi
[docs] def max_expectation_gamma(self, std, errorArr, numMatches, vDist, iters=5): """ Compute the mixture parameter gamma via the expectation-maximization algorithm. Parameters ---------- std : float Standard deviation of the residuals. errorArr : ndarray Array of squared error values. numMatches : int Number of matches. vDist : float Parameter defined by the disparity search range. iters : int, optional Number of EM iterations. Default is 5. Returns ------- gamma : float Estimated mixture parameter. """ # gamma is the start value of the mixture parameter gamma = 0.5 # expectation maximization algorithm according to Torr: MLESAC # errorArr is already squared for j in range(iters): Pi = ( gamma * (1 / (np.sqrt(2 * np.pi) * std)) * np.exp(-errorArr / (2 * std**2)) ) Po = (1 - gamma) / vDist z = Pi / (Pi + Po) z_s = np.sum(z) gamma = z_s / numMatches return gamma
[docs] def compute_standard_deviation(self, numMatches, leastMedian): """ Compute robust or fixed estimate of the standard deviation. Parameters ---------- numMatches : int Number of matches. leastMedian : float Least median of residuals. Returns ------- std : float Estimated or fixed standard deviation. threshold : float Threshold computed as ``(kappa * std) ** 2``. """ if self.stdEstimation == "adaptive": # length of the parameter vector is 4 due to the affine epipolar geometry lenPv = 4 # formula according to Leroy and Rousseaux std = 1.4826 * (1 + 5 / (numMatches - lenPv)) * np.sqrt(leastMedian) threshold = (self.kappa * std) ** 2 else: std = self.stdFixed threshold = (self.kappa * std) ** 2 return std, threshold
[docs] def single_affine_fundamental_matrix(self, matches): """ Estimate an affine fundamental matrix using the affine gold standard algorithm. Parameters ---------- matches : ndarray of shape (n, 4) Array of point correspondences with format (x1, y1, x2, y2). Returns ------- F : ndarray of shape (3, 3) Estimated affine fundamental matrix. """ # adjust the order of points as defined by Hartley and Zisserman X = matches[:, [2, 3, 0, 1]] # compute the centroid n = len(X) centroid = np.sum(X, axis=0) / n # compute the (n x 4) matrix A of the centered matches A = X - np.tile(centroid, (n, 1)) # the solution vector is given by the singular vector corresponding to the smallest # singular value of matrix A which is the last row of matrix VT U, s, VT = np.linalg.svd(A) f = VT[-1, :] # reshape values to obtain the affine fundamental matrix accoring to Hartley and Zisserman MVG F = np.zeros((3, 3)) F[0, 2] = f[0] F[1, 2] = f[1] F[2, 0] = f[2] F[2, 1] = f[3] F[2, 2] = -np.dot(f, centroid) return F
[docs] def geometric_distance(self, pts1, pts2, F, costFunction, score): """ Compute geometric distance between corresponding points under a fundamental matrix. Parameters ---------- pts1 : ndarray of shape (n, 2) or (n, 3) Points from the first image. pts2 : ndarray of shape (n, 2) or (n, 3) Points from the second image. F : ndarray of shape (3, 3) Fundamental matrix. costFunction : {'sampson', 'symEpi'} Cost function to compute residuals. score : {'sum', 'single'} Whether to return the sum of distances or per-point distances. Returns ------- distance : float or ndarray Geometric distance(s) depending on `score`. """ if len(pts1[0]) == 2: length = len(pts1) ON = np.ones((length, 1)) pts1 = np.concatenate((pts1, ON), 1) pts2 = np.concatenate((pts2, ON), 1) npts1 = np.einsum("ij->ji", pts1) npts2 = np.einsum("ij->ji", pts2) a = np.einsum("ji,jk->ik", npts2, F) a = np.einsum("ij,ji->i", a, npts1) a = np.square(a) temp1 = np.einsum("ij,jk->ik", F, npts1) temp2 = np.einsum("ji,jk->ik", F, npts2) b = np.square(temp1[0]) c = np.square(temp1[1]) d = np.square(temp2[0]) e = np.square(temp2[1]) if self.costFunction == "sampson": # Sampson geoDist = a / (b + c + d + e) elif self.costFunction == "symEpi": # symmetric epipolar distance geoDist = a * (1 / (b + c) + 1 / (d + e)) if score == "sum": distance = np.einsum("i->", geoDist) elif score == "single": distance = geoDist return distance