Source code for miop.epipolar_geometry

# Original code from Stefan Toeberg (2023)
# Copyright (c) 2023, Stefan Toeberg.
#
# Modified by Maxime Paschoud (2025)
# Copyright (c) 2025, Maxime Paschoud.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
#
# (http://opensource.org/licenses/BSD-3-Clause)
#
# __author__ = "Stefan Toeberg, LUH: IMR; Maxime Paschoud, ETHZ: CMBM"
#  


from pipeline import DAGNode

import numpy as np

[docs] class EpipolarGeometry(DAGNode): def __init__(self, numSamples=1000, kappa=2.5, stdFixed=1., searchRange=200, bucketing=False, stdEstimation='adaptive', nSets=1, verbose=False, cost_function='sampson', robust_method="MLESAC"): """ Initialize the EpipolarGeometry module for robust fundamental matrix estimation. Parameters ---------- numSamples : int, optional Number of fundamental matrix estimations to perform (default is 1000). kappa : float, optional Multiplier for standard deviation thresholding (default is 2.5). stdFixed : float, optional Fixed standard deviation value if `stdEstimation` is 'fixed' (default is 1.0). searchRange : int, optional Search range used for robust cost modeling (default is 200). bucketing : bool, optional Whether to use spatial bucketing for match sampling (default is False). stdEstimation : {'adaptive', 'fixed'}, optional Method for standard deviation estimation (default is 'adaptive'). nSets : int, optional Number of top fundamental matrices to keep (default is 1). verbose : bool, optional Whether to print intermediate results (default is False). cost_function : str, optional Cost function to evaluate match quality ('sampson' or 'symEpi') (default is 'sampson'). robust_method : str, optional Robust estimation method to use ('RANSAC', 'MSAC', 'LMedS', 'MLESAC') (default is 'MLESAC'). """ super().__init__() self.kappa = kappa self.searchRange = searchRange self.stdFixed = stdFixed self.stdEstimation = stdEstimation # adaptive or fixed self.bucketing = bucketing # Boolean, True to choose the 4 points using bucketing techniques. False: random select self.numSamples = numSamples # how many Fundamental matrix estimation we perform self.nSets = nSets # how many best Fundamental matrices and inlier sets we want to consider self.verbose = verbose # print intermediate results self.cost_function = cost_function # distance function to evaluate quality of the reprojection self.robust_method = robust_method # the method to perform how to detect good estimation (RANSAC etc...)
[docs] def robust_sampson(self, Farr, pts1, pts2): """ Compute Sampson distances for each match and each fundamental matrix sample. Parameters ---------- Farr : ndarray of shape (numSamples, 3, 3) Fundamental matrices. pts1 : ndarray of shape (n, 3) Homogenized points from the first image. pts2 : ndarray of shape (n, 3) Homogenized points from the second image. Returns ------- sampsonDist : ndarray of shape (numSamples, n) Computed Sampson distances. """ # compute all neccessary terms a = np.einsum("ij,kjl->kil", pts2, Farr) a = np.einsum("ijk,jk->ij", a, pts1) asq = np.square(a) temp1 = np.einsum("ijk,lk->ijl", Farr, pts1) temp2 = np.einsum("ijk,lj->ikl", Farr, pts2) b = np.square(temp1[:, 0, :]) c = np.square(temp1[:, 1, :]) d = np.square(temp2[:, 0, :]) e = np.square(temp2[:, 1, :]) # calculate Sampson distance for each point for each sample sampsonDist = asq / (b + c + d + e) return sampsonDist
[docs] def eval(self, matches): """ Perform robust estimation of fundamental matrices for multiple stereo pairs. Parameters ---------- matches : list of tuples Each tuple contains matched 2D points (pts1, pts2) for one stereo pair. Returns ------- list A list of dictionaries or a list of lists of dictionaries containing: - 'pts1', 'pts2': inlier point coordinates - 'F': estimated fundamental matrix - 'cost': model cost - 'std': standard deviation of residuals """ # numSamples = self.numSamples # nSets = self.nSets # imageSize = np.shape(self.stereoPairs[0][0])[0] print(f"In epipolar matches len: {len(matches)}") self.epipolarSets = [] # len(matches) equals number of stereo pairs # compute robust Fa for every stereo pair print(f"Epipolar len of matches: {len(matches), len(matches[0])}") for match in matches: pts1 = match[0] pts2 = match[1] if len(pts1) < 4: raise Exception('Not enough matches to compute Fa.') # fSets are the n best sets of the SAC routine fSets = self.estimate_robust_affine_fundamental_matrix( pts1, pts2, self.numSamples) # get the number of inliers inMax = fSets[0][0] # estimate the number of numSamples to achieve a probability of 99% for a good set eps = 1-len(inMax)/len(pts1) # pGood = 1-(1-(1-eps)**4)**num_samples, underlying formula pGood = 0.99 ns = np.log(1-pGood)/(np.log(1-(1-eps)**4)) # add a margin of 500 sets to account for degenerate sets (task is not time sensitive) # and check if more samples are needed if (ns+500) > self.numSamples: if np.round(ns) > 10000: raise Exception('Too much outliers in the detected matches for stereo pair {}.\n' 'Number of computed sample capped to 10000.'.format(i+1)) ns = 10000 else: ns = np.round(ns)+500 fSets = self.estimate_robust_affine_fundamental_matrix( pts1, pts2, ns) nSets = [] for n in range(self.nSets): epiSet = {} epiSet['pts1'] = fSets[n][0] epiSet['pts2'] = fSets[n][1] epiSet['F'] = fSets[n][2] epiSet['cost'] = fSets[n][3] epiSet['std'] = fSets[n][-1] nSets.append(epiSet) # in normal mode, when only the best sample is needed, directly append dict if self.nSets == 1: self.epipolarSets.append(epiSet) else: self.epipolarSets.append(nSets) """ if self.verbose == True: # compute and print the residuals for the best F and the corresponding inliers as the rmse if self.cost_function == 'sampson': _, _, rmseResiduals, _ = dist.sampson_distance( nSets[0]['pts1'], nSets[0]['pts2'], nSets[0]['F']) elif self.cost_function == 'symEpi': _, _, rmseResiduals, _ = dist.sym_epi_dist( nSets[0]['pts1'], nSets[0]['pts2'], nSets[0]['F']) print('{} / {} ({:.2f}% inliers), '.format(len(inMax), len(pts1), (len(inMax)/len(pts1))*10**2), end='', flush=True) print('RMSE({}) = {:.4f} px²\n'.format( self.cost_function, rmseResiduals)) """ return [self.epipolarSets]
[docs] def estimate_robust_affine_fundamental_matrix(self, pts1, pts2, numSamples): """ Estimate the best fundamental matrices using robust methods. Parameters ---------- pts1 : ndarray of shape (n, 2) Points from the first image. pts2 : ndarray of shape (n, 2) Points from the second image. numSamples : int Number of models to estimate. Returns ------- setsIn : list List of tuples (inPts1, inPts2, F, cost, std) for the best fundamental matrices. """ method = self.robust_method # compute numSamples of models (fundamental matrices) Farr = self.tensor_affine_fundamental_matrix( pts1, pts2, numSamples) if method == "RANSAC": setsIn = self.ransac(Farr, pts1, pts) elif method == "MSAC": setsIn = self.msac(Farr, pts1, pts2) elif method == "LMedS": setsIn = self.lmeds(Farr, pts1, pts2) elif method == "MLESAC": setsIn = self.mlesac(Farr, pts1, pts2) return setsIn
[docs] def tensor_affine_fundamental_matrix(self, pts1, pts2, numSamples): """ Generate fundamental matrices using the affine gold standard algorithm. Parameters ---------- pts1 : ndarray of shape (n, 2) Points from the first image. pts2 : ndarray of shape (n, 2) Points from the second image. numSamples : int Number of samples to compute. Returns ------- F : ndarray of shape (numSamples, 3, 3) Fundamental matrices. """ # number of points numPoints = len(pts1) if self.bucketing == True: # if enabled, use bucketing technique to select the four matches from which F is computed points1 = np.empty((numSamples, 4, 2)) points2 = np.empty((numSamples, 4, 2)) for i in range(numSamples): # select samples of 4 matches, distributed over the image space #points1[i], points2[i] = utils.bucketing_sac(pts1, pts2, size, 4) break else: # if False, just get 4 random correspondences for each sample idx = [] for i in range(int(numSamples)): idx.append(np.random.choice(range(numPoints), 4, replace=False)) idx = np.reshape(np.asarray(idx), (int(numSamples), 4)) points1 = pts1[idx] points2 = pts2[idx] # adjust the order of points according to Hartley and Zisserman MVG finalSets = np.dstack((points2[:, :, 0:2], points1[:, :, 0:2])) # compute centroids of the point sets in the 3D array n = np.shape(finalSets)[2] centroids = np.einsum("ijk->ik", finalSets) / n centroidsShaped = np.reshape((centroids), (int(numSamples), 1, n)) centroidsShapedTiled = np.tile(centroidsShaped, (1, n, 1)) # solve for F affine A = finalSets - centroidsShapedTiled U, s, VT = np.linalg.svd(A) f = VT[:, 3, :] # extract values and build F F = np.zeros((np.shape(f)[0], 3, 3)) F[:, 0, 2] = f[:, 0] F[:, 1, 2] = f[:, 1] F[:, 2, 0] = f[:, 2] F[:, 2, 1] = f[:, 3] F[:, 2, 2] = -np.einsum("ij,ij->i", f, centroids) return F
[docs] def ransac(self, Farr, pts1, pts2): """ Estimate the fundamental matrix using RANSAC. Parameters ---------- Farr : ndarray of shape (numSamples, 3, 3) Sampled fundamental matrices. pts1 : ndarray of shape (n, 2) Points from the first image. pts2 : ndarray of shape (n, 2) Points from the second image. Returns ------- setsIn : list List of tuples (inPts1, inPts2, F, cost, std) sorted by inlier count. """ nSets = self.nSets errorArr, std, threshold, _ = self.compute_costs_thresholds( pts1, pts2, Farr) # check for which points (inliers) cost function is smaller than threshold mask = errorArr < threshold # count number of inliers trueIndices = np.count_nonzero(mask, 1) # find the n models (n samples) with highest number of inliers bestSamples = np.argpartition(trueIndices, -nSets)[-nSets:] # get indices of inliers corresponding to the best n models goodIndicesSet = mask[bestSamples] # inlier sets inPts1 = [] inPts2 = [] Fin = [] rhoIn = [] # get the best n sets unsorted for i in range(nSets): inPts1.append(pts1[goodIndicesSet[i]]) inPts2.append(pts2[goodIndicesSet[i]]) Fin.append(Farr[bestSamples[i]]) rhoIn.append(np.sum(goodIndicesSet[i])) # sort sets and put them in list stdArr = np.repeat(std, nSets) setsIn = sorted( zip(inPts1, inPts2, Fin, rhoIn, stdArr), key=lambda x: len(x[0]), reverse=True ) setsIn.append(std) return setsIn
[docs] def lmeds(self, Farr, pts1, pts2): """ Estimate the fundamental matrix using the Least Median of Squares (LMedS) method. Parameters ---------- Farr : ndarray of shape (numSamples, 3, 3) Sampled fundamental matrices. pts1 : ndarray of shape (n, 2) Points from the first image. pts2 : ndarray of shape (n, 2) Points from the second image. Returns ------- setsIn : list List of tuples (inPts1, inPts2, F, cost, std) sorted by cost. """ nSets = self.nSets # compute costs errorArr, std, threshold, _ = self.compute_costs_thresholds( pts1, pts2, Farr) rho = np.median(errorArr, 1) # find the nSets best sets (not sorted yet) bestSets = np.argpartition(rho, nSets)[:nSets] newMask = errorArr[bestSets] < threshold # inlier sets inPts1 = [] inPts2 = [] Fin = [] rhoIn = [] # get the corresponding inliers and fundamental matrices (sets not sorted yet) for i in range(nSets): inPts1.append(pts1[newMask[i]]) inPts2.append(pts2[newMask[i]]) Fin.append(Farr[bestSets[i]]) rhoIn.append(rho[bestSets[i]]) # add the estimated standard deviation to every set stdArr = np.repeat(std, nSets) # sort the sets and zip the lists setsIn = sorted( zip(inPts1, inPts2, Fin, rhoIn, stdArr), key=lambda x: x[3], reverse=False ) return setsIn
[docs] def msac(self, Farr, pts1, pts2): """ Estimate the fundamental matrix using the MSAC algorithm. Parameters ---------- Farr : ndarray of shape (numSamples, 3, 3) Sampled fundamental matrices. pts1 : ndarray of shape (n, 2) Points from the first image. pts2 : ndarray of shape (n, 2) Points from the second image. Returns ------- setsIn : list List of tuples (inPts1, inPts2, F, cost, std) sorted by cost. """ nSets = self.nSets errorArr, std, threshold, _ = self.compute_costs_thresholds( pts1, pts2, Farr) # check for which points (outliers) cost function is greater than threshold mask = errorArr >= threshold # compute cost, score of outliers = threshold, score of inliers = cost function errorArr[mask] = threshold rho = np.sum(errorArr, 1) # find the nSets best models with lowest costs bestSets = np.argpartition(rho, nSets)[:nSets] newMask = errorArr[bestSets] < threshold # inlier sets inPts1 = [] inPts2 = [] Fin = [] rhoIn = [] # get the best n sets unsorted for i in range(nSets): inPts1.append(pts1[newMask[i]]) inPts2.append(pts2[newMask[i]]) Fin.append(Farr[bestSets[i]]) rhoIn.append(rho[bestSets[i]]) stdArr = np.repeat(std, nSets) setsIn = sorted( zip(inPts1, inPts2, Fin, rhoIn, stdArr), key=lambda x: x[3], reverse=False ) return setsIn
[docs] def mlesac(self, Farr, pts1, pts2): """ Estimate the fundamental matrix using the MLESAC algorithm. Parameters ---------- Farr : ndarray of shape (numSamples, 3, 3) Sampled fundamental matrices. pts1 : ndarray of shape (n, 2) Points from the first image. pts2 : ndarray of shape (n, 2) Points from the second image. Returns ------- setsIn : list List of tuples (inPts1, inPts2, F, cost, std) sorted by likelihood. """ vDist = self.searchRange nSets = self.nSets errorArr, std, threshold, _ = self.compute_costs_thresholds( pts1, pts2, Farr) numMatches = len(pts1) # compute the mixture parameter gamma = self.max_expectation_gamma_robust( std, errorArr, numMatches, vDist, iters=5) # compute logarithmic probability function of mixed distributions for each point Pi = gamma * (1 / (np.sqrt(2 * np.pi) * std)) * \ np.exp(-errorArr / (2 * std**2)) Po = (1 - gamma) / vDist L = -np.log(Pi + Po) # compute errorArr function rho, which sums logs rho = np.sum(L, 1) bestSets = np.argpartition(rho, nSets)[:nSets] newMask = errorArr[bestSets] < threshold # inlier sets inPts1 = [] inPts2 = [] Fin = [] rhoIn = [] # get the best n sets unsorted for i in range(nSets): inPts1.append(pts1[newMask[i]]) inPts2.append(pts2[newMask[i]]) Fin.append(Farr[bestSets[i]]) rhoIn.append(rho[bestSets[i]]) stdArr = np.repeat(std, nSets) setsIn = sorted( zip(inPts1, inPts2, Fin, rhoIn, stdArr), key=lambda x: x[3], reverse=False ) return setsIn
[docs] def compute_costs_thresholds(self, pts1, pts2, Farr): """ Compute the cost values and inlier threshold. Parameters ---------- pts1 : ndarray of shape (n, 2) Points from the first image. pts2 : ndarray of shape (n, 2) Points from the second image. Farr : ndarray of shape (numSamples, 3, 3) Fundamental matrices. Returns ------- errorArr : ndarray Error array (numSamples x numPoints). std : float Estimated or fixed standard deviation. threshold : float Inlier threshold. leastMedian : float Minimum median value across all samples. """ cost_function = self.cost_function numMatches = len(pts1) # homogenize all points arrOnes = np.ones((numMatches, 1)) pts1 = np.concatenate((pts1, arrOnes), 1) pts2 = np.concatenate((pts2, arrOnes), 1) # compute costs if cost_function == "symEpi": errorArr = dist.robust_residual_distance(Farr, pts1, pts2) else: errorArr = self.robust_sampson(Farr, pts1, pts2) # compute medians of cost function medians = np.median(errorArr, 1) # find smallest median idx = np.argmin(medians) leastMedian = medians[idx] # robust sigma estimation std, threshold = self.compute_standard_deviation( numMatches, leastMedian) return errorArr, std, threshold, leastMedian
[docs] def compute_standard_deviation(self, numMatches, leastMedian): """ Compute standard deviation and inlier threshold. Parameters ---------- numMatches : int Number of matches. leastMedian : float Least median of residuals. Returns ------- std : float Estimated or fixed standard deviation. threshold : float Threshold for inlier classification. """ if self.stdEstimation == "adaptive": # length of the parameter vector is 4 due to the affine epipolar geometry lenPv = 4 # formula according to Leroy and Rousseaux std = 1.4826 * (1 + 5 / (numMatches - lenPv)) * np.sqrt(leastMedian) threshold = (self.kappa * std) ** 2 else: std = self.stdFixed threshold = (self.kappa * std) ** 2 return std, threshold
[docs] def max_expectation_gamma_robust(self, std, errorArr, numMatches, vDist, iters=5): """ Estimate mixture parameter gamma via EM for MLESAC. Parameters ---------- std : float Estimated standard deviation. errorArr : ndarray Error values (squared distances). numMatches : int Number of matches. vDist : float Search range (for uniform outlier model). iters : int, optional Number of EM iterations (default is 5). Returns ------- gamma : ndarray of shape (numSamples, 1) Mixture parameter gamma values. """ # gamma is the start value of the mixture parameter gamma = 0.5 # expectation maximization algorithm according to Torr: MLESAC # errorArr is already squared for j in range(iters): Pi = ( gamma * (1 / (np.sqrt(2 * np.pi) * std)) * np.exp(-errorArr / (2 * std**2)) ) Po = (1 - gamma) / vDist z = Pi / (Pi + Po) z_s = np.sum(z, 1) gamma = z_s / numMatches gamma = np.reshape(gamma, (len(errorArr), 1)) return gamma
[docs] def single_affine_fundamental_matrix(self, matches): """ Estimate a single fundamental matrix from matched points. Parameters ---------- matches : ndarray of shape (n, 4) Matched keypoints or points between two images. Each element of the array represents one corresponding point (x1, y1, x2, y2) Returns ------- F : ndarray of shape (3, 3) The estimated affine fundamental matrix """ # adjust the order of points as defined by Hartley and Zisserman X = matches[:, [2, 3, 0, 1]] # compute the centroid n = len(X) centroid = np.sum(X, axis=0) / n # compute the (n x 4) matrix A of the centered matches A = X - np.tile(centroid, (n, 1)) # the solution vector is given by the singular vector corresponding to the smallest # singular value of matrix A which is the last row of matrix VT U, s, VT = np.linalg.svd(A) f = VT[-1, :] # reshape values to obtain the affine fundamental matrix accoring to Hartley and Zisserman MVG F = np.zeros((3, 3)) F[0, 2] = f[0] F[1, 2] = f[1] F[2, 0] = f[2] F[2, 1] = f[3] F[2, 2] = -np.dot(f, centroid) return F