SLAM Introduction (Visual SLAM Introduction)
SLAM Introduction (Visual SLAM Introduction)¶
Overview¶
SLAM (Simultaneous Localization and Mapping) is a technology that enables robots and autonomous systems to build maps while simultaneously estimating their own position in unknown environments. This lesson covers the fundamentals of Visual SLAM, LiDAR SLAM, and Loop Closure.
Difficulty: ββββ
Prerequisites: 3D vision, feature detection/matching, camera calibration, basic probability theory
Table of Contents¶
- SLAM Overview
- Visual Odometry
- ORB-SLAM
- LiDAR SLAM
- Loop Closure
- SLAM Implementation Practice
- Practice Problems
1. SLAM Overview¶
What is SLAM?¶
SLAM (Simultaneous Localization and Mapping):
Simultaneous localization and mapping
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β Key Questions: β
β "How can you know your position without a map?" β
β "How can you build a map without knowing your position?" β
β β
β β Solve both simultaneously! (Chicken and egg problem) β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β Sensor Data β β
β β (Camera, LiDAR, IMU) β β
β β β β β
β β βΌ β β
β β ββββββββββββββββ β β
β β β SLAM β β β
β β β Algorithm β β β
β β ββββββββ¬ββββββββ β β
β β β β β
β β ββββββββ΄ββββββββ β β
β β β β β β
β β βΌ βΌ β β
β β βββββββββββ βββββββββββ β β
β β β Map β β Pose β β β
β β β (Map) β β (Pose) β β β
β β βββββββββββ βββββββββββ β β
β β β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Applications:
βββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββ
β Field β Examples β
βββββββββββββββββββΌββββββββββββββββββββββββββββββββββββββββββ€
β Autonomous β Cars, drones, delivery robots β
β Driving β β
βββββββββββββββββββΌββββββββββββββββββββββββββββββββββββββββββ€
β Augmented β ARKit, ARCore, HoloLens β
β Reality β β
βββββββββββββββββββΌββββββββββββββββββββββββββββββββββββββββββ€
β Robot Vacuum β Roomba, Roborock β
β Cleaners β β
βββββββββββββββββββΌββββββββββββββββββββββββββββββββββββββββββ€
β 3D Scanning β Architecture, cultural heritage β
β β restoration β
βββββββββββββββββββΌββββββββββββββββββββββββββββββββββββββββββ€
β Navigation β Indoor localization β
βββββββββββββββββββ΄ββββββββββββββββββββββββββββββββββββββββββ
SLAM Classification¶
SLAM Method Classification:
1. Sensor-based Classification
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β Visual SLAM (V-SLAM) β
β - Camera (monocular, stereo, RGB-D) β
β - Feature-based or direct methods β
β - Examples: ORB-SLAM, LSD-SLAM, DSO β
β β
β LiDAR SLAM β
β - Laser scanner β
β - Point cloud matching β
β - Examples: Cartographer, LOAM, LeGO-LOAM β
β β
β Visual-Inertial SLAM β
β - Camera + IMU fusion β
β - Examples: VINS-Mono, OKVIS, MSCKF β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
2. Methodology-based Classification
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β Filter-based β
β - EKF-SLAM, UKF-SLAM β
β - Real-time updates β
β - Linearization error accumulation issues β
β β
β Graph-based β
β - Pose graph optimization β
β - Bundle adjustment β
β - More accurate but computationally expensive β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
3. Front-end/Back-end
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β Front-end β
β - Sensor data processing β
β - Feature extraction and matching β
β - Initial pose estimation β
β - Loop closure detection β
β β
β Back-end β
β - Global optimization β
β - Graph optimization β
β - Uncertainty estimation β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
2. Visual Odometry¶
Visual Odometry Concept¶
Visual Odometry (VO):
Estimating camera motion from consecutive images
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β Frame t-1 Frame t Frame t+1 β
β βββββββββ βββββββββ βββββββββ β
β β π· βββTβββββΆβ π· βββTβββββΆβ π· β β
β βββββββββ βββββββββ βββββββββ β
β β
β Accumulated Pose: P_t = Tβ * Tβ * ... * T_t β
β β
β Problems: β
β - Accumulated drift β
β - Scale ambiguity (monocular camera) β
β - Vulnerable to fast motion β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
VO Pipeline:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β 1. Image Acquisition β
β βΌ β
β 2. Feature Extraction (ORB, SIFT, Harris corners) β
β βΌ β
β 3. Feature Matching/Tracking (BF Matcher, Optical Flow) β
β βΌ β
β 4. Motion Estimation (Essential Matrix, PnP) β
β βΌ β
β 5. Local Optimization (Local BA) β
β βΌ β
β 6. Pose Update β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Monocular Visual Odometry Implementation¶
import cv2
import numpy as np
class MonocularVO:
"""Monocular Visual Odometry"""
def __init__(self, K, detector='ORB'):
"""
K: Camera intrinsic parameter matrix
detector: Feature detector ('ORB', 'SIFT', 'FAST')
"""
self.K = K
self.focal = K[0, 0]
self.pp = (K[0, 2], K[1, 2]) # principal point
# Feature detector
if detector == 'ORB':
self.detector = cv2.ORB_create(3000)
elif detector == 'SIFT':
self.detector = cv2.SIFT_create(3000)
else:
self.detector = cv2.FastFeatureDetector_create(threshold=25)
# Optical flow parameters
self.lk_params = dict(
winSize=(21, 21),
maxLevel=3,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01)
)
# State
self.prev_frame = None
self.prev_pts = None
self.cur_R = np.eye(3)
self.cur_t = np.zeros((3, 1))
self.trajectory = []
def detect_features(self, img):
"""Detect features"""
if hasattr(self.detector, 'detectAndCompute'):
kp, _ = self.detector.detectAndCompute(img, None)
else:
kp = self.detector.detect(img, None)
pts = np.array([p.pt for p in kp], dtype=np.float32)
return pts.reshape(-1, 1, 2)
def track_features(self, prev_img, cur_img, prev_pts):
"""Track features using optical flow"""
cur_pts, status, err = cv2.calcOpticalFlowPyrLK(
prev_img, cur_img, prev_pts, None, **self.lk_params
)
status = status.reshape(-1)
prev_pts = prev_pts[status == 1]
cur_pts = cur_pts[status == 1]
return prev_pts, cur_pts
def estimate_pose(self, pts1, pts2):
"""Estimate pose using Essential Matrix"""
E, mask = cv2.findEssentialMat(
pts1, pts2, self.K,
method=cv2.RANSAC,
prob=0.999,
threshold=1.0
)
_, R, t, mask = cv2.recoverPose(E, pts1, pts2, self.K)
return R, t
def process_frame(self, frame):
"""Process frame"""
# Convert to grayscale
if len(frame.shape) == 3:
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
else:
gray = frame
if self.prev_frame is None:
# First frame
self.prev_frame = gray
self.prev_pts = self.detect_features(gray)
return self.cur_R, self.cur_t
# Track features
if self.prev_pts is not None and len(self.prev_pts) > 0:
prev_pts, cur_pts = self.track_features(
self.prev_frame, gray, self.prev_pts
)
if len(prev_pts) >= 8:
# Estimate pose
R, t = self.estimate_pose(
prev_pts.reshape(-1, 2),
cur_pts.reshape(-1, 2)
)
# Accumulate pose
self.cur_t = self.cur_t + self.cur_R @ t
self.cur_R = R @ self.cur_R
# Detect new features if needed
if len(cur_pts) < 1000:
new_pts = self.detect_features(gray)
if len(cur_pts) > 0:
self.prev_pts = np.vstack([
cur_pts.reshape(-1, 1, 2),
new_pts
])
else:
self.prev_pts = new_pts
else:
self.prev_pts = cur_pts.reshape(-1, 1, 2)
else:
self.prev_pts = self.detect_features(gray)
else:
self.prev_pts = self.detect_features(gray)
self.prev_frame = gray
# Save trajectory
self.trajectory.append(self.cur_t.copy())
return self.cur_R, self.cur_t
def get_trajectory(self):
"""Return trajectory"""
return np.array([t.ravel() for t in self.trajectory])
# Usage example
K = np.array([
[718.856, 0, 607.1928],
[0, 718.856, 185.2157],
[0, 0, 1]
], dtype=np.float32)
vo = MonocularVO(K)
cap = cv2.VideoCapture('driving.mp4')
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
R, t = vo.process_frame(frame)
# Print current position
x, y, z = t.ravel()
print(f"Position: x={x:.2f}, y={y:.2f}, z={z:.2f}")
cap.release()
# Visualize trajectory
trajectory = vo.get_trajectory()
Stereo Visual Odometry¶
class StereoVO:
"""Stereo Visual Odometry"""
def __init__(self, K, baseline, detector='ORB'):
self.K = K
self.baseline = baseline
self.focal = K[0, 0]
self.detector = cv2.ORB_create(3000)
self.bf = cv2.BFMatcher(cv2.NORM_HAMMING)
# Stereo matcher
self.stereo = cv2.StereoSGBM_create(
minDisparity=0,
numDisparities=128,
blockSize=5,
P1=8 * 3 * 5 ** 2,
P2=32 * 3 * 5 ** 2
)
self.prev_pts_3d = None
self.prev_kp = None
self.prev_desc = None
self.cur_R = np.eye(3)
self.cur_t = np.zeros((3, 1))
def compute_depth(self, left, right):
"""Compute depth using stereo matching"""
disparity = self.stereo.compute(left, right).astype(np.float32) / 16.0
# Disparity β depth
depth = np.zeros_like(disparity)
valid = disparity > 0
depth[valid] = self.focal * self.baseline / disparity[valid]
return depth
def get_3d_points(self, kp, depth):
"""Convert 2D keypoints to 3D"""
fx = self.K[0, 0]
fy = self.K[1, 1]
cx = self.K[0, 2]
cy = self.K[1, 2]
pts_3d = []
valid_indices = []
for i, pt in enumerate(kp):
x, y = int(pt.pt[0]), int(pt.pt[1])
if 0 <= x < depth.shape[1] and 0 <= y < depth.shape[0]:
z = depth[y, x]
if z > 0 and z < 100: # Valid depth
X = (pt.pt[0] - cx) * z / fx
Y = (pt.pt[1] - cy) * z / fy
pts_3d.append([X, Y, z])
valid_indices.append(i)
return np.array(pts_3d), valid_indices
def process_frame(self, left, right):
"""Process stereo frame"""
# Convert to grayscale
gray_left = cv2.cvtColor(left, cv2.COLOR_BGR2GRAY)
gray_right = cv2.cvtColor(right, cv2.COLOR_BGR2GRAY)
# Compute depth
depth = self.compute_depth(gray_left, gray_right)
# Detect features
kp, desc = self.detector.detectAndCompute(gray_left, None)
# Compute 3D points
pts_3d, valid_idx = self.get_3d_points(kp, depth)
if self.prev_pts_3d is None:
self.prev_pts_3d = pts_3d
self.prev_kp = [kp[i] for i in valid_idx]
self.prev_desc = desc[valid_idx]
return self.cur_R, self.cur_t
# Match with previous frame
matches = self.bf.knnMatch(self.prev_desc, desc[valid_idx], k=2)
good_matches = []
for m, n in matches:
if m.distance < 0.7 * n.distance:
good_matches.append(m)
if len(good_matches) >= 6:
# 3D-2D correspondences
obj_points = np.array([
self.prev_pts_3d[m.queryIdx] for m in good_matches
])
img_points = np.array([
kp[valid_idx[m.trainIdx]].pt for m in good_matches
])
# Estimate pose using PnP
success, rvec, tvec, inliers = cv2.solvePnPRansac(
obj_points, img_points, self.K, None
)
if success and inliers is not None and len(inliers) > 10:
R, _ = cv2.Rodrigues(rvec)
# Accumulate pose
self.cur_t = self.cur_t + self.cur_R @ tvec
self.cur_R = R @ self.cur_R
# Update state
self.prev_pts_3d = pts_3d
self.prev_kp = [kp[i] for i in valid_idx]
self.prev_desc = desc[valid_idx]
return self.cur_R, self.cur_t
3. ORB-SLAM¶
ORB-SLAM Overview¶
ORB-SLAM Architecture:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β ORB-SLAM: Most widely used Visual SLAM system β
β β
β Versions: β
β - ORB-SLAM (2015): Monocular β
β - ORB-SLAM2 (2017): Monocular/Stereo/RGB-D β
β - ORB-SLAM3 (2021): Visual-Inertial, multi-map β
β β
β Three parallel threads: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β β β Tracking β βLocal Mappingβ βLoop Closing β β β
β β β Thread β β Thread β β Thread β β β
β β ββββββββ¬βββββββ ββββββββ¬βββββββ ββββββββ¬βββββββ β β
β β β β β β β
β β β Keyframes β β β β
β β βββββββββββββββββΆβ β β β
β β β Keyframes β β β
β β βββββββββββββββββΆβ β β
β β β β β
β β ββββββββββββββββββββββββββββββββββββββββββ β β
β β β Map (MapPoints) ββ β β
β β β & Covisibility Graph ββ β β
β β ββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Tracking Thread:
- Process every frame
- ORB feature extraction
- Match with previous frame or map
- Initial pose estimation
- Keyframe decision
Local Mapping Thread:
- Insert new keyframes
- Cull recent MapPoints
- Create new MapPoints
- Local Bundle Adjustment
- Remove redundant keyframes
Loop Closing Thread:
- Detect loop candidates (DBoW2)
- Verify and correct loops
- Essential Graph optimization
- Global Bundle Adjustment
ORB Features and Bag of Words¶
import cv2
import numpy as np
class ORBVocabulary:
"""ORB-based Bag of Words"""
def __init__(self, num_words=1000):
self.orb = cv2.ORB_create(1000)
self.num_words = num_words
self.vocabulary = None
self.bf = cv2.BFMatcher(cv2.NORM_HAMMING)
def train(self, images):
"""Train vocabulary from images"""
all_descriptors = []
for img in images:
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
_, desc = self.orb.detectAndCompute(gray, None)
if desc is not None:
all_descriptors.append(desc)
all_desc = np.vstack(all_descriptors)
# K-means clustering
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER,
100, 0.2)
_, labels, centers = cv2.kmeans(
all_desc.astype(np.float32),
self.num_words,
None,
criteria,
10,
cv2.KMEANS_RANDOM_CENTERS
)
self.vocabulary = centers.astype(np.uint8)
print(f"Vocabulary created: {self.num_words} words")
def compute_bow(self, img):
"""Compute BoW vector for image"""
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
_, desc = self.orb.detectAndCompute(gray, None)
if desc is None:
return np.zeros(self.num_words)
# Assign each descriptor to nearest vocabulary word
matches = self.bf.match(desc, self.vocabulary)
bow = np.zeros(self.num_words)
for m in matches:
bow[m.trainIdx] += 1
# Normalize
bow = bow / (np.linalg.norm(bow) + 1e-6)
return bow
def compute_similarity(self, bow1, bow2):
"""Similarity between two BoW vectors"""
return np.dot(bow1, bow2)
class SimpleSLAM:
"""Simple SLAM system (ORB-SLAM concept)"""
def __init__(self, K):
self.K = K
self.orb = cv2.ORB_create(2000)
self.bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
# Map
self.keyframes = [] # Keyframe list
self.map_points = [] # 3D points
self.poses = [] # Keyframe poses
# Current state
self.cur_R = np.eye(3)
self.cur_t = np.zeros((3, 1))
self.prev_frame = None
self.prev_kp = None
self.prev_desc = None
# Keyframe criteria
self.kf_threshold = 30 # Minimum matches
def is_keyframe(self, num_matches, motion):
"""Decide if keyframe"""
# Simple criteria: keyframe if few matches or large motion
translation = np.linalg.norm(motion)
if num_matches < self.kf_threshold or translation > 0.5:
return True
return False
def add_keyframe(self, frame, kp, desc, pose):
"""Add keyframe"""
keyframe = {
'frame': frame.copy(),
'keypoints': kp,
'descriptors': desc,
'pose': pose.copy()
}
self.keyframes.append(keyframe)
self.poses.append(pose)
print(f"Keyframe added: total {len(self.keyframes)}")
def process_frame(self, frame):
"""Process frame (Tracking)"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
kp, desc = self.orb.detectAndCompute(gray, None)
if self.prev_frame is None:
# First frame β keyframe
pose = {'R': np.eye(3), 't': np.zeros((3, 1))}
self.add_keyframe(gray, kp, desc, pose)
self.prev_frame = gray
self.prev_kp = kp
self.prev_desc = desc
return self.cur_R, self.cur_t
# Match with previous frame
matches = self.bf.match(self.prev_desc, desc)
matches = sorted(matches, key=lambda x: x.distance)[:500]
if len(matches) >= 8:
# Extract matched points
pts1 = np.float32([self.prev_kp[m.queryIdx].pt for m in matches])
pts2 = np.float32([kp[m.trainIdx].pt for m in matches])
# Estimate pose using Essential Matrix
E, mask = cv2.findEssentialMat(pts1, pts2, self.K)
_, R, t, mask = cv2.recoverPose(E, pts1, pts2, self.K)
# Accumulate pose
self.cur_t = self.cur_t + self.cur_R @ t
self.cur_R = R @ self.cur_R
# Check keyframe
if self.is_keyframe(len(matches), t):
pose = {'R': self.cur_R.copy(), 't': self.cur_t.copy()}
self.add_keyframe(gray, kp, desc, pose)
# Update state
self.prev_frame = gray
self.prev_kp = kp
self.prev_desc = desc
return self.cur_R, self.cur_t
def get_camera_trajectory(self):
"""Return camera trajectory"""
trajectory = []
for pose in self.poses:
R = pose['R']
t = pose['t']
# Camera position = -R^T * t
pos = -R.T @ t
trajectory.append(pos.ravel())
return np.array(trajectory)
4. LiDAR SLAM¶
LiDAR SLAM Overview¶
LiDAR SLAM:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β LiDAR Sensor Characteristics: β
β - 360-degree scanning β
β - Accurate distance measurement β
β - Robust to lighting conditions β
β - Rich 3D point clouds β
β β
β LiDAR Types: β
β ββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββ β
β β 2D LiDAR β Planar scan, affordable, robot β β
β β (e.g., RPLiDAR) β vacuum cleaners β β
β ββββββββββββββββββββΌββββββββββββββββββββββββββββββββββββββ€ β
β β 3D LiDAR β 3D point clouds, autonomous β β
β β (e.g., Velodyne) β driving β β
β ββββββββββββββββββββΌββββββββββββββββββββββββββββββββββββββ€ β
β β Solid-State β Non-rotating, compact, latest β β
β β (e.g., Livox) β trend β β
β ββββββββββββββββββββ΄ββββββββββββββββββββββββββββββββββββββ β
β β
β Key Algorithms: β
β - ICP (Iterative Closest Point) β
β - NDT (Normal Distributions Transform) β
β - LOAM (LiDAR Odometry and Mapping) β
β - LeGO-LOAM (Lightweight Ground-Optimized) β
β - Cartographer (Google) β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
ICP (Iterative Closest Point)¶
import numpy as np
from scipy.spatial import KDTree
def icp(source, target, max_iterations=50, tolerance=1e-6):
"""
ICP algorithm for aligning two point clouds
Parameters:
source: Source point cloud (N x 3)
target: Target point cloud (M x 3)
Returns:
R: Rotation matrix (3 x 3)
t: Translation vector (3,)
transformed: Transformed source points
"""
src = source.copy()
prev_error = float('inf')
R_total = np.eye(3)
t_total = np.zeros(3)
# KD-Tree for efficient nearest neighbor search
tree = KDTree(target)
for i in range(max_iterations):
# 1. Find nearest correspondences
distances, indices = tree.query(src)
correspondences = target[indices]
# 2. Estimate transformation (SVD)
src_centroid = np.mean(src, axis=0)
tgt_centroid = np.mean(correspondences, axis=0)
src_centered = src - src_centroid
tgt_centered = correspondences - tgt_centroid
H = src_centered.T @ tgt_centered
U, _, Vt = np.linalg.svd(H)
R = Vt.T @ U.T
# Correct reflection
if np.linalg.det(R) < 0:
Vt[-1, :] *= -1
R = Vt.T @ U.T
t = tgt_centroid - R @ src_centroid
# 3. Apply transformation
src = (R @ src.T).T + t
# Accumulate transformation
R_total = R @ R_total
t_total = R @ t_total + t
# 4. Check convergence
mean_error = np.mean(distances)
if abs(prev_error - mean_error) < tolerance:
print(f"ICP converged: {i+1} iterations, error: {mean_error:.6f}")
break
prev_error = mean_error
return R_total, t_total, src
class LiDARSLAM:
"""Simple 2D LiDAR SLAM"""
def __init__(self, map_resolution=0.05):
self.resolution = map_resolution
self.pose = np.array([0.0, 0.0, 0.0]) # x, y, theta
self.trajectory = [self.pose.copy()]
# Occupancy grid map
self.map_size = 1000
self.occupancy_map = np.ones((self.map_size, self.map_size)) * 0.5
self.map_origin = np.array([self.map_size // 2, self.map_size // 2])
def scan_to_points(self, scan_ranges, scan_angles):
"""Convert scan data to 2D points"""
valid = (scan_ranges > 0.1) & (scan_ranges < 30.0)
ranges = scan_ranges[valid]
angles = scan_angles[valid]
x = ranges * np.cos(angles)
y = ranges * np.sin(angles)
return np.column_stack([x, y])
def transform_points(self, points, pose):
"""Transform points to world coordinates"""
x, y, theta = pose
R = np.array([
[np.cos(theta), -np.sin(theta)],
[np.sin(theta), np.cos(theta)]
])
transformed = (R @ points.T).T + np.array([x, y])
return transformed
def point_to_grid(self, points):
"""Convert points to grid coordinates"""
grid_x = (points[:, 0] / self.resolution + self.map_origin[0]).astype(int)
grid_y = (points[:, 1] / self.resolution + self.map_origin[1]).astype(int)
# Limit to map bounds
valid = (grid_x >= 0) & (grid_x < self.map_size) & \
(grid_y >= 0) & (grid_y < self.map_size)
return grid_x[valid], grid_y[valid], valid
def update_map(self, scan_points, pose):
"""Update occupancy grid map"""
world_points = self.transform_points(scan_points, pose)
gx, gy, valid = self.point_to_grid(world_points)
# Update occupancy probability (log odds)
self.occupancy_map[gy, gx] = np.clip(
self.occupancy_map[gy, gx] + 0.1, 0, 1
)
def match_scan(self, current_points, previous_points):
"""Estimate relative motion using scan matching"""
if len(previous_points) < 10 or len(current_points) < 10:
return np.array([0, 0, 0])
# Apply ICP
R, t, _ = icp(current_points, previous_points)
# Extract theta in 2D
theta = np.arctan2(R[1, 0], R[0, 0])
return np.array([t[0], t[1], theta])
def process_scan(self, scan_ranges, scan_angles, prev_scan=None):
"""Process scan"""
current_points = self.scan_to_points(scan_ranges, scan_angles)
if prev_scan is not None:
prev_points = self.scan_to_points(prev_scan[0], prev_scan[1])
# Scan matching
delta_pose = self.match_scan(current_points, prev_points)
# Update pose
self.pose[2] += delta_pose[2]
R = np.array([
[np.cos(self.pose[2]), -np.sin(self.pose[2])],
[np.sin(self.pose[2]), np.cos(self.pose[2])]
])
self.pose[:2] += R @ delta_pose[:2]
# Update map
self.update_map(current_points, self.pose)
# Save trajectory
self.trajectory.append(self.pose.copy())
return self.pose
def get_occupancy_map(self):
"""Return occupancy map"""
return self.occupancy_map
def get_trajectory(self):
"""Return trajectory"""
return np.array(self.trajectory)
5. Loop Closure¶
Loop Closure Concept¶
Loop Closure:
Recognizing previously visited places to correct accumulated drift
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β Problem: Drift (accumulated error) β
β β
β Actual Path Estimated Path (with drift) β
β βββββββββββ βββββββββββ β
β β β β β² β
β β β β β² β
β β β β β² β
β βββββββββββ ββββββββββββββ² β
β (closed loop) (open curve) β
β β
β Solution: Loop Closure β
β 1. Detect if current location was visited before β
β 2. Add loop constraint β
β 3. Pose graph optimization β
β β
β βββββββββββ β
β β ββββββ β Loop detection β
β β β β β
β β β β β Graph optimization β
β β ββββββ β
β βββββββββββ β
β (corrected path) β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Loop Closure Implementation¶
import cv2
import numpy as np
from collections import deque
class LoopClosureDetector:
"""Bag of Words-based loop closure detection"""
def __init__(self, vocabulary_size=1000, min_score=0.3):
self.orb = cv2.ORB_create(2000)
self.bf = cv2.BFMatcher(cv2.NORM_HAMMING)
self.vocabulary = None
self.vocabulary_size = vocabulary_size
self.min_score = min_score
# Keyframe database
self.keyframe_bows = []
self.keyframe_descs = []
self.keyframe_kps = []
# Exclude recent N keyframes from loop candidates
self.temporal_window = 30
def build_vocabulary(self, training_images):
"""Build vocabulary"""
all_descriptors = []
for img in training_images:
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
_, desc = self.orb.detectAndCompute(gray, None)
if desc is not None:
all_descriptors.append(desc)
all_desc = np.vstack(all_descriptors).astype(np.float32)
# K-means
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER,
100, 0.2)
_, _, self.vocabulary = cv2.kmeans(
all_desc, self.vocabulary_size, None,
criteria, 10, cv2.KMEANS_RANDOM_CENTERS
)
self.vocabulary = self.vocabulary.astype(np.uint8)
def compute_bow(self, descriptors):
"""Compute BoW vector"""
if self.vocabulary is None or descriptors is None:
return None
matches = self.bf.match(descriptors, self.vocabulary)
bow = np.zeros(self.vocabulary_size)
for m in matches:
bow[m.trainIdx] += 1
# L2 normalization
norm = np.linalg.norm(bow)
if norm > 0:
bow = bow / norm
return bow
def add_keyframe(self, frame):
"""Add keyframe"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
kp, desc = self.orb.detectAndCompute(gray, None)
if desc is None:
return -1
bow = self.compute_bow(desc)
self.keyframe_bows.append(bow)
self.keyframe_descs.append(desc)
self.keyframe_kps.append(kp)
return len(self.keyframe_bows) - 1
def detect_loop(self, query_idx):
"""Detect loop candidates"""
if query_idx < self.temporal_window + 1:
return None, 0
query_bow = self.keyframe_bows[query_idx]
best_match = -1
best_score = 0
# Search only temporally distant keyframes
for i in range(query_idx - self.temporal_window):
score = np.dot(query_bow, self.keyframe_bows[i])
if score > best_score and score > self.min_score:
best_score = score
best_match = i
if best_match >= 0:
return best_match, best_score
return None, 0
def verify_loop(self, query_idx, candidate_idx, min_inliers=50):
"""Verify loop using geometric verification"""
desc1 = self.keyframe_descs[query_idx]
desc2 = self.keyframe_descs[candidate_idx]
kp1 = self.keyframe_kps[query_idx]
kp2 = self.keyframe_kps[candidate_idx]
# Feature matching
matches = self.bf.knnMatch(desc1, desc2, k=2)
good_matches = []
for m, n in matches:
if m.distance < 0.75 * n.distance:
good_matches.append(m)
if len(good_matches) < 8:
return False, None
# Geometric verification using Fundamental Matrix
pts1 = np.float32([kp1[m.queryIdx].pt for m in good_matches])
pts2 = np.float32([kp2[m.trainIdx].pt for m in good_matches])
F, mask = cv2.findFundamentalMat(pts1, pts2, cv2.FM_RANSAC)
if mask is None:
return False, None
num_inliers = np.sum(mask)
if num_inliers >= min_inliers:
return True, {
'query_idx': query_idx,
'match_idx': candidate_idx,
'inliers': num_inliers,
'pts1': pts1[mask.ravel() == 1],
'pts2': pts2[mask.ravel() == 1]
}
return False, None
class PoseGraphOptimizer:
"""Simple pose graph optimization"""
def __init__(self):
self.poses = [] # Nodes (poses)
self.edges = [] # Edges (relative transforms)
self.loop_constraints = [] # Loop constraints
def add_pose(self, pose):
"""Add pose node"""
self.poses.append(pose.copy())
return len(self.poses) - 1
def add_odometry_edge(self, i, j, relative_pose, info_matrix=None):
"""Add odometry edge"""
if info_matrix is None:
info_matrix = np.eye(3)
self.edges.append({
'from': i,
'to': j,
'measurement': relative_pose,
'info': info_matrix
})
def add_loop_constraint(self, i, j, relative_pose, info_matrix=None):
"""Add loop constraint"""
if info_matrix is None:
# Loop constraints have high weight
info_matrix = np.eye(3) * 100
self.loop_constraints.append({
'from': i,
'to': j,
'measurement': relative_pose,
'info': info_matrix
})
def optimize(self, num_iterations=10):
"""Graph optimization (Gauss-Newton)"""
# Simple implementation (in practice, use g2o, Ceres, etc.)
print("Pose graph optimization recommended to use specialized libraries like g2o")
# Simple correction using loop constraints
for constraint in self.loop_constraints:
i = constraint['from']
j = constraint['to']
# Calculate accumulated drift
drift = self.poses[j][:2] - self.poses[i][:2]
drift -= constraint['measurement'][:2]
# Distribute drift using linear interpolation
for k in range(i, j + 1):
alpha = (k - i) / (j - i) if j > i else 0
self.poses[k][:2] -= alpha * drift
return self.poses
6. SLAM Implementation Practice¶
Simple SLAM System¶
import cv2
import numpy as np
class SimpleVSLAM:
"""Simple Visual SLAM system"""
def __init__(self, K):
self.K = K
# Modules
self.vo = MonocularVO(K)
self.loop_detector = LoopClosureDetector()
self.pose_graph = PoseGraphOptimizer()
# State
self.frame_count = 0
self.keyframe_interval = 10
def process_frame(self, frame):
"""Process frame"""
self.frame_count += 1
# Visual Odometry
R, t = self.vo.process_frame(frame)
# Add keyframe
if self.frame_count % self.keyframe_interval == 0:
kf_idx = self.loop_detector.add_keyframe(frame)
# Add node to pose graph
pose = np.array([t[0, 0], t[1, 0], 0]) # 2D approximation
node_idx = self.pose_graph.add_pose(pose)
# Connect edge with previous keyframe
if node_idx > 0:
prev_pose = self.pose_graph.poses[node_idx - 1]
relative = pose - prev_pose
self.pose_graph.add_odometry_edge(
node_idx - 1, node_idx, relative
)
# Loop detection
if kf_idx > 30: # After sufficient keyframes
candidate, score = self.loop_detector.detect_loop(kf_idx)
if candidate is not None:
verified, loop_info = self.loop_detector.verify_loop(
kf_idx, candidate
)
if verified:
print(f"Loop detected: {kf_idx} -> {candidate}")
# Add loop constraint
relative = pose - self.pose_graph.poses[candidate]
self.pose_graph.add_loop_constraint(
candidate, node_idx, relative
)
# Optimize
self.pose_graph.optimize()
return R, t
def get_map(self):
"""Return map"""
return self.vo.get_trajectory()
def get_optimized_trajectory(self):
"""Return optimized trajectory"""
return np.array(self.pose_graph.poses)
Visualization¶
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
def visualize_slam_result(trajectory, loop_closures=None):
"""Visualize SLAM results"""
fig = plt.figure(figsize=(12, 5))
# 2D trajectory
ax1 = fig.add_subplot(121)
ax1.plot(trajectory[:, 0], trajectory[:, 1], 'b-', linewidth=1)
ax1.scatter(trajectory[0, 0], trajectory[0, 1],
c='green', s=100, marker='o', label='Start')
ax1.scatter(trajectory[-1, 0], trajectory[-1, 1],
c='red', s=100, marker='x', label='End')
if loop_closures:
for lc in loop_closures:
i, j = lc['from'], lc['to']
ax1.plot([trajectory[i, 0], trajectory[j, 0]],
[trajectory[i, 1], trajectory[j, 1]],
'g--', linewidth=2, alpha=0.5)
ax1.set_xlabel('X')
ax1.set_ylabel('Y')
ax1.set_title('2D Trajectory')
ax1.legend()
ax1.axis('equal')
ax1.grid(True)
# 3D trajectory
ax2 = fig.add_subplot(122, projection='3d')
ax2.plot(trajectory[:, 0], trajectory[:, 1], trajectory[:, 2],
'b-', linewidth=1)
ax2.set_xlabel('X')
ax2.set_ylabel('Y')
ax2.set_zlabel('Z')
ax2.set_title('3D Trajectory')
plt.tight_layout()
plt.show()
def visualize_occupancy_map(occupancy_map, trajectory=None):
"""Visualize occupancy map"""
plt.figure(figsize=(10, 10))
# Display map
plt.imshow(occupancy_map, cmap='gray', origin='lower')
# Overlay trajectory
if trajectory is not None:
# Convert to map coordinates
map_center = occupancy_map.shape[0] // 2
resolution = 0.05
traj_map = trajectory / resolution + map_center
plt.plot(traj_map[:, 0], traj_map[:, 1], 'r-', linewidth=2)
plt.scatter(traj_map[0, 0], traj_map[0, 1], c='green', s=100)
plt.scatter(traj_map[-1, 0], traj_map[-1, 1], c='blue', s=100)
plt.title('Occupancy Grid Map')
plt.xlabel('X')
plt.ylabel('Y')
plt.colorbar(label='Occupancy Probability')
plt.show()
7. Practice Problems¶
Problem 1: Visual Odometry Implementation¶
Implement monocular Visual Odometry.
Requirements: - ORB feature detection - Optical flow or descriptor matching - Pose estimation using Essential Matrix - Trajectory visualization
Hint
# Essential Matrix
E, mask = cv2.findEssentialMat(pts1, pts2, K)
_, R, t, _ = cv2.recoverPose(E, pts1, pts2, K)
# Accumulate pose
cur_t = cur_t + cur_R @ t
cur_R = R @ cur_R
Problem 2: Loop Closure Detection¶
Implement BoW-based loop closure.
Requirements: - Build ORB vocabulary - Compute BoW vectors - Detect candidates based on similarity - Geometric verification
Hint
# BoW similarity
score = np.dot(bow1, bow2)
# Geometric verification
F, mask = cv2.findFundamentalMat(pts1, pts2, cv2.FM_RANSAC)
inliers = np.sum(mask)
Problem 3: ICP Implementation¶
Implement the ICP algorithm.
Requirements: - Nearest correspondence search - Estimate transformation using SVD - Iterative optimization - Convergence criteria
Hint
# Calculate R, t using SVD
H = src_centered.T @ tgt_centered
U, _, Vt = np.linalg.svd(H)
R = Vt.T @ U.T
t = tgt_centroid - R @ src_centroid
Problem 4: Occupancy Grid Map¶
Create an occupancy grid map from LiDAR data.
Requirements: - Convert scan data to points - Grid coordinate transformation - Update occupancy probabilities - Visualize map
Hint
# Log odds update
log_odds = np.log(p / (1 - p))
log_odds[occupied] += 0.5
log_odds[free] -= 0.2
p = 1 / (1 + np.exp(-log_odds))
Problem 5: Complete SLAM System¶
Implement a SLAM system integrating VO, loop closure, and mapping.
Requirements: - Keyframe management - Loop detection and verification - Pose graph optimization - 3D map generation
Hint
# Integrated system
class SLAM:
def process(self, frame):
# 1. Tracking
pose = self.track(frame)
# 2. Update map if keyframe
if self.is_keyframe():
self.local_mapping()
# 3. Loop detection
if self.detect_loop():
self.optimize_graph()
Next Steps¶
- Use real SLAM libraries (ORB-SLAM3, RTAB-Map)
- ROS integration
- Visual-Inertial SLAM
- Deep learning-based SLAM