Video Processing

Video Processing

Overview

Video is a sequence of continuous image frames. We will learn to process video files and camera streams using OpenCV, and explore motion analysis methods using background subtraction and optical flow.

Difficulty: ***

Prerequisites: Basic image operations, filtering, object detection


Table of Contents

  1. VideoCapture: Files and Cameras
  2. VideoWriter: Saving Video
  3. Frame-by-frame Processing
  4. FPS Calculation
  5. Background Subtraction (MOG2, KNN)
  6. Optical Flow
  7. Object Tracking
  8. Practice Problems

1. VideoCapture: Files and Cameras

Understanding Video Structure

Video = Sequence of continuous image frames

Time ------------------------------------------>
    +-----++-----++-----++-----++-----+
    |Frame||Frame||Frame||Frame||Frame| ...
    |  1  ||  2  ||  3  ||  4  ||  5  |
    +-----++-----++-----++-----++-----+

FPS (Frames Per Second): Number of frames per second
- 24 FPS: Movie standard
- 30 FPS: General video
- 60 FPS: Gaming, sports
- 120+ FPS: Slow motion

Resolution: Size of each frame
- 640x480: VGA
- 1280x720: HD (720p)
- 1920x1080: Full HD (1080p)
- 3840x2160: 4K

Reading Video Files

import cv2

# Open video file
cap = cv2.VideoCapture('video.mp4')

# Check if opened successfully
if not cap.isOpened():
    print("Cannot open video")
    exit()

# Get video properties
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = frame_count / fps

print(f"Resolution: {width}x{height}")
print(f"FPS: {fps}")
print(f"Total frames: {frame_count}")
print(f"Duration: {duration:.2f} seconds")

# Frame reading loop
while True:
    ret, frame = cap.read()

    if not ret:
        print("End of video or error")
        break

    # Frame processing
    cv2.imshow('Video', frame)

    # Exit with 'q' key, wait 1ms
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Release resources
cap.release()
cv2.destroyAllWindows()

Camera Input

import cv2

# Open camera (device ID: 0=default camera)
cap = cv2.VideoCapture(0)

# If camera fails to open
if not cap.isOpened():
    print("Cannot open camera")
    exit()

# Set camera properties
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
cap.set(cv2.CAP_PROP_FPS, 30)

# Set buffer size (reduce latency)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)

print(f"Camera resolution: {int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))}x"
      f"{int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))}")

while True:
    ret, frame = cap.read()

    if not ret:
        continue

    # Horizontal flip (mirror effect)
    frame = cv2.flip(frame, 1)

    cv2.imshow('Camera', frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

Key VideoCapture Properties

import cv2

cap = cv2.VideoCapture('video.mp4')

# Read properties
properties = {
    'CAP_PROP_FRAME_WIDTH': cv2.CAP_PROP_FRAME_WIDTH,    # Frame width
    'CAP_PROP_FRAME_HEIGHT': cv2.CAP_PROP_FRAME_HEIGHT,  # Frame height
    'CAP_PROP_FPS': cv2.CAP_PROP_FPS,                    # FPS
    'CAP_PROP_FRAME_COUNT': cv2.CAP_PROP_FRAME_COUNT,    # Total frame count
    'CAP_PROP_POS_FRAMES': cv2.CAP_PROP_POS_FRAMES,      # Current frame position
    'CAP_PROP_POS_MSEC': cv2.CAP_PROP_POS_MSEC,          # Current position (ms)
    'CAP_PROP_FOURCC': cv2.CAP_PROP_FOURCC,              # Codec 4-char code
    'CAP_PROP_BRIGHTNESS': cv2.CAP_PROP_BRIGHTNESS,      # Brightness (camera)
    'CAP_PROP_CONTRAST': cv2.CAP_PROP_CONTRAST,          # Contrast (camera)
}

for name, prop in properties.items():
    value = cap.get(prop)
    print(f"{name}: {value}")

# Seek to specific frame
cap.set(cv2.CAP_PROP_POS_FRAMES, 100)  # Go to frame 100

# Seek to specific time (milliseconds)
cap.set(cv2.CAP_PROP_POS_MSEC, 5000)  # Go to 5 seconds

cap.release()

2. VideoWriter: Saving Video

Basic Video Saving

import cv2

# Video capture setup
cap = cv2.VideoCapture(0)

# Video properties
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = 30.0

# Codec setup (4-character code)
# 'XVID': for AVI container
# 'mp4v': for MP4 container
# 'MJPG': Motion JPEG
fourcc = cv2.VideoWriter_fourcc(*'mp4v')

# Create VideoWriter
out = cv2.VideoWriter('output.mp4', fourcc, fps, (width, height))

print("Recording started... Press 'q' to stop")

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Save frame
    out.write(frame)

    # Recording indicator
    cv2.circle(frame, (30, 30), 10, (0, 0, 255), -1)  # Red circle
    cv2.putText(frame, 'REC', (50, 40),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

    cv2.imshow('Recording', frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Release resources
cap.release()
out.release()
cv2.destroyAllWindows()
print("Recording complete: output.mp4")

Major Codecs

+-----------+-------------+------------------------+
|   Codec   |  Container  |      Characteristics   |
+-----------+-------------+------------------------+
| 'XVID'    | .avi        | Widely supported,      |
|           |             | decent compression     |
| 'MJPG'    | .avi        | Motion JPEG, fast      |
| 'mp4v'    | .mp4        | MPEG-4, good compat    |
| 'avc1'    | .mp4        | H.264, high compression|
| 'X264'    | .mp4        | H.264 (requirements)   |
| 'VP80'    | .webm       | VP8, for web           |
| 'VP90'    | .webm       | VP9, high efficiency   |
+-----------+-------------+------------------------+

# Codec test
def test_codec(codec_str, extension):
    fourcc = cv2.VideoWriter_fourcc(*codec_str)
    out = cv2.VideoWriter(f'test.{extension}', fourcc, 30, (640, 480))
    if out.isOpened():
        print(f"{codec_str}: Supported")
        out.release()
        return True
    else:
        print(f"{codec_str}: Not supported")
        return False

Processing and Saving Video

import cv2

def process_and_save_video(input_path, output_path, process_func):
    """Process video and save"""

    cap = cv2.VideoCapture(input_path)

    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    frame_num = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # Process frame
        processed = process_func(frame)

        # Save
        out.write(processed)

        # Progress display
        frame_num += 1
        progress = (frame_num / total_frames) * 100
        print(f"\rProcessing: {progress:.1f}%", end='')

    print("\nComplete!")

    cap.release()
    out.release()

# Usage example: Grayscale conversion and edge detection
def edge_detection(frame):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    edges = cv2.Canny(gray, 50, 150)
    # Convert to 3 channels (VideoWriter is set for color video)
    return cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR)

process_and_save_video('input.mp4', 'edges.mp4', edge_detection)

3. Frame-by-frame Processing

Frame Processing Pipeline

Frame Processing Pipeline:

Input --> Preprocessing --> Analysis --> Postprocessing --> Output
              |              |              |
              v              v              v
          - Resize       - Detection    - Visualization
          - Color conv   - Tracking     - Filtering
          - Noise        - Recognition  - Compositing
            removal

Multi-processing Example

import cv2
import numpy as np

class VideoProcessor:
    """Video frame processor"""

    def __init__(self):
        self.processors = []

    def add_processor(self, name, func):
        """Add processing function"""
        self.processors.append((name, func))

    def process_frame(self, frame):
        """Apply all processing functions"""
        result = frame.copy()
        for name, func in self.processors:
            result = func(result)
        return result

    def process_video(self, input_source, output_path=None, display=True):
        """Process video"""
        cap = cv2.VideoCapture(input_source)

        out = None
        if output_path:
            width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
            height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
            fps = cap.get(cv2.CAP_PROP_FPS)
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Process
            processed = self.process_frame(frame)

            # Save
            if out:
                out.write(processed)

            # Display
            if display:
                cv2.imshow('Processed', processed)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

        cap.release()
        if out:
            out.release()
        cv2.destroyAllWindows()

# Usage example
processor = VideoProcessor()

# Add processing functions
processor.add_processor('blur', lambda f: cv2.GaussianBlur(f, (5, 5), 0))
processor.add_processor('edge', lambda f: cv2.Canny(f, 50, 150))

def add_timestamp(frame):
    import datetime
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    cv2.putText(frame, now, (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
    return frame

processor.add_processor('timestamp', add_timestamp)

# Process webcam
processor.process_video(0, output_path='recorded.mp4')

Frame Skipping and Buffering

import cv2
import time

def skip_frames_processing(video_path, skip=5):
    """Frame skipping (speed improvement)"""

    cap = cv2.VideoCapture(video_path)

    frame_count = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        frame_count += 1

        # Process every skip frames
        if frame_count % skip != 0:
            continue

        # Perform heavy processing
        processed = heavy_processing(frame)

        cv2.imshow('Skipped Processing', processed)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()

def buffered_reading(video_path, buffer_size=10):
    """Frame buffering (smooth playback)"""
    from collections import deque
    from threading import Thread

    cap = cv2.VideoCapture(video_path)
    buffer = deque(maxlen=buffer_size)
    stop_flag = False

    def read_frames():
        while not stop_flag:
            ret, frame = cap.read()
            if not ret:
                break
            if len(buffer) < buffer_size:
                buffer.append(frame)

    # Start reading thread
    thread = Thread(target=read_frames)
    thread.start()

    # Wait for initial buffer fill
    time.sleep(0.5)

    while True:
        if len(buffer) > 0:
            frame = buffer.popleft()
            cv2.imshow('Buffered', frame)

        if cv2.waitKey(30) & 0xFF == ord('q'):
            break

    stop_flag = True
    thread.join()
    cap.release()

4. FPS Calculation

FPS Measurement Method

import cv2
import time

class FPSCounter:
    """FPS measurement class"""

    def __init__(self, avg_frames=30):
        self.frame_times = []
        self.avg_frames = avg_frames
        self.last_time = time.time()

    def update(self):
        """Call after processing each frame"""
        current_time = time.time()
        self.frame_times.append(current_time - self.last_time)
        self.last_time = current_time

        # Keep only last N frames
        if len(self.frame_times) > self.avg_frames:
            self.frame_times.pop(0)

    def get_fps(self):
        """Return current FPS"""
        if len(self.frame_times) == 0:
            return 0
        avg_time = sum(self.frame_times) / len(self.frame_times)
        return 1.0 / avg_time if avg_time > 0 else 0

# Usage example
cap = cv2.VideoCapture(0)
fps_counter = FPSCounter()

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Frame processing
    # ...

    fps_counter.update()
    fps = fps_counter.get_fps()

    # Display FPS
    cv2.putText(frame, f'FPS: {fps:.1f}', (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

    cv2.imshow('FPS', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()

Processing Time Analysis

import cv2
import time

class PerformanceMonitor:
    """Performance monitoring"""

    def __init__(self):
        self.timings = {}

    def start(self, name):
        """Start timing"""
        self.timings[name] = {'start': time.time()}

    def stop(self, name):
        """Stop timing"""
        if name in self.timings:
            elapsed = time.time() - self.timings[name]['start']
            self.timings[name]['elapsed'] = elapsed
            return elapsed
        return 0

    def get_report(self):
        """Performance report"""
        report = []
        for name, data in self.timings.items():
            if 'elapsed' in data:
                report.append(f"{name}: {data['elapsed']*1000:.2f}ms")
        return '\n'.join(report)

# Usage example
monitor = PerformanceMonitor()

cap = cv2.VideoCapture(0)

while True:
    # Measure total frame time
    monitor.start('total')

    ret, frame = cap.read()
    if not ret:
        break

    # Measure preprocessing time
    monitor.start('preprocess')
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    blur = cv2.GaussianBlur(gray, (5, 5), 0)
    monitor.stop('preprocess')

    # Measure detection time
    monitor.start('detection')
    edges = cv2.Canny(blur, 50, 150)
    monitor.stop('detection')

    monitor.stop('total')

    # Display performance
    y = 30
    for line in monitor.get_report().split('\n'):
        cv2.putText(frame, line, (10, y),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
        y += 20

    cv2.imshow('Performance', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()

5. Background Subtraction (MOG2, KNN)

Background Subtraction Principle

Background Subtraction:
Separate moving foreground objects from stationary background

+-----------------+     +-----------------+     +-----------------+
| Current frame   |  -  | Background model|  =  | Foreground mask |
|                 |     |                 |     |                 |
|    +---+        |     |                 |     |    +---+        |
|    | * | (person)|    |   (empty room)  |     |    |###|        |
|    +---+        |     |                 |     |    +---+        |
|                 |     |                 |     |                 |
+-----------------+     +-----------------+     +-----------------+

Background model learning:
- Analyze multiple frames to learn background statistics
- Handle lighting changes, shadows, etc.
- Adapt to dynamic backgrounds (tree leaves, etc.)

MOG2 (Mixture of Gaussians)

import cv2
import numpy as np

# Create MOG2 background subtractor
backSub = cv2.createBackgroundSubtractorMOG2(
    history=500,          # Number of frames for background learning
    varThreshold=16,      # Variance threshold for background classification
    detectShadows=True    # Shadow detection
)

cap = cv2.VideoCapture(0)

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Apply background subtraction
    # fgMask: foreground=255, background=0, shadow=127
    fgMask = backSub.apply(frame)

    # Remove shadows (127 -> 0)
    fgMask_no_shadow = cv2.threshold(fgMask, 200, 255, cv2.THRESH_BINARY)[1]

    # Remove noise
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
    fgMask_clean = cv2.morphologyEx(fgMask_no_shadow, cv2.MORPH_OPEN, kernel)
    fgMask_clean = cv2.morphologyEx(fgMask_clean, cv2.MORPH_CLOSE, kernel)

    # Extract foreground
    foreground = cv2.bitwise_and(frame, frame, mask=fgMask_clean)

    # Display results
    cv2.imshow('Original', frame)
    cv2.imshow('FG Mask', fgMask)
    cv2.imshow('Cleaned Mask', fgMask_clean)
    cv2.imshow('Foreground', foreground)

    if cv2.waitKey(30) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

KNN Background Subtraction

import cv2

# Create KNN background subtractor
backSub = cv2.createBackgroundSubtractorKNN(
    history=500,          # Background learning frame count
    dist2Threshold=400.0, # Distance threshold
    detectShadows=True    # Shadow detection
)

cap = cv2.VideoCapture('traffic.mp4')

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Background subtraction
    fgMask = backSub.apply(frame)

    # Remove noise
    fgMask = cv2.medianBlur(fgMask, 5)

    # Contour detection
    contours, _ = cv2.findContours(fgMask, cv2.RETR_EXTERNAL,
                                    cv2.CHAIN_APPROX_SIMPLE)

    # Mark moving objects
    for contour in contours:
        area = cv2.contourArea(contour)
        if area > 500:  # Minimum area filter
            x, y, w, h = cv2.boundingRect(contour)
            cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)

    cv2.imshow('Motion Detection', frame)
    cv2.imshow('Mask', fgMask)

    if cv2.waitKey(30) & 0xFF == ord('q'):
        break

cap.release()

MOG2 vs KNN Comparison

+----------------+----------------------+----------------------+
|     Item       |        MOG2          |        KNN           |
+----------------+----------------------+----------------------+
| Algorithm      | Gaussian Mixture Model| K-Nearest Neighbors |
| Speed          | Fast                 | Medium               |
| Memory         | Low                  | High                 |
| Dynamic BG     | Medium               | Good                 |
| Lighting Change| Medium               | Good                 |
| Noise          | Sensitive            | Robust               |
| Recommended    | Static scenes,       | Complex scenes       |
|                | real-time            |                      |
+----------------+----------------------+----------------------+

6. Optical Flow

Optical Flow Concept

Optical Flow:
Estimate pixel movement between consecutive frames

Frame t                    Frame t+1
+-----------------+        +-----------------+
|                 |        |                 |
|    *            |   ->   |        *        |
|                 |        |                 |
+-----------------+        +-----------------+

Velocity vector (u, v):
- Pixel (x, y) moves to (x+u, y+v) in next frame
- I(x, y, t) = I(x+u, y+v, t+1) (brightness constancy assumption)

Types:
1. Sparse: Only compute movement for specific points (Lucas-Kanade)
2. Dense: Compute movement for all pixels (Farneback)

Lucas-Kanade Optical Flow

import cv2
import numpy as np

# Lucas-Kanade parameters
lk_params = dict(
    winSize=(15, 15),      # Search window size
    maxLevel=2,            # Pyramid levels
    criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)
)

# Feature detection parameters
feature_params = dict(
    maxCorners=100,        # Maximum feature count
    qualityLevel=0.3,      # Quality level
    minDistance=7,         # Minimum distance
    blockSize=7
)

cap = cv2.VideoCapture(0)

# Read first frame
ret, old_frame = cap.read()
old_gray = cv2.cvtColor(old_frame, cv2.COLOR_BGR2GRAY)

# Detect features
p0 = cv2.goodFeaturesToTrack(old_gray, mask=None, **feature_params)

# For trajectory visualization
mask = np.zeros_like(old_frame)

# Colors
colors = np.random.randint(0, 255, (100, 3))

while True:
    ret, frame = cap.read()
    if not ret:
        break

    frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    if p0 is not None and len(p0) > 0:
        # Compute optical flow
        p1, st, err = cv2.calcOpticalFlowPyrLK(
            old_gray, frame_gray, p0, None, **lk_params
        )

        if p1 is not None:
            # Select good points only
            good_new = p1[st == 1]
            good_old = p0[st == 1]

            # Visualize movement
            for i, (new, old) in enumerate(zip(good_new, good_old)):
                a, b = new.ravel().astype(int)
                c, d = old.ravel().astype(int)

                # Trajectory line
                mask = cv2.line(mask, (a, b), (c, d),
                               colors[i % 100].tolist(), 2)
                # Current position point
                frame = cv2.circle(frame, (a, b), 5,
                                   colors[i % 100].tolist(), -1)

            # Update for next frame
            p0 = good_new.reshape(-1, 1, 2)

    # Combine trajectory
    img = cv2.add(frame, mask)

    cv2.imshow('Lucas-Kanade', img)

    key = cv2.waitKey(30) & 0xFF
    if key == ord('q'):
        break
    elif key == ord('r'):
        # Re-detect features with 'r' key
        p0 = cv2.goodFeaturesToTrack(frame_gray, mask=None, **feature_params)
        mask = np.zeros_like(frame)

    old_gray = frame_gray.copy()

cap.release()
cv2.destroyAllWindows()

Farneback Dense Optical Flow

import cv2
import numpy as np

def draw_flow(img, flow, step=16):
    """Visualize flow vectors"""
    h, w = img.shape[:2]
    y, x = np.mgrid[step/2:h:step, step/2:w:step].astype(int)
    fx, fy = flow[y, x].T

    # Draw lines
    lines = np.vstack([x, y, x+fx, y+fy]).T.reshape(-1, 2, 2)
    lines = np.int32(lines + 0.5)

    vis = img.copy()
    cv2.polylines(vis, lines, 0, (0, 255, 0))

    for (x1, y1), (x2, y2) in lines:
        cv2.circle(vis, (x1, y1), 2, (0, 255, 0), -1)

    return vis

def flow_to_hsv(flow):
    """Convert flow to HSV color"""
    mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])

    hsv = np.zeros((*flow.shape[:2], 3), dtype=np.uint8)
    hsv[..., 0] = ang * 180 / np.pi / 2  # Direction -> Hue
    hsv[..., 1] = 255  # Saturation
    hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)  # Magnitude -> Value

    return cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)

cap = cv2.VideoCapture(0)

ret, frame1 = cap.read()
prvs = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)

while True:
    ret, frame2 = cap.read()
    if not ret:
        break

    next_gray = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)

    # Farneback optical flow
    flow = cv2.calcOpticalFlowFarneback(
        prvs, next_gray,
        None,           # Initial flow
        pyr_scale=0.5,  # Pyramid scale
        levels=3,       # Pyramid levels
        winsize=15,     # Window size
        iterations=3,   # Iterations
        poly_n=5,       # Polynomial size
        poly_sigma=1.2, # Gaussian sigma
        flags=0
    )

    # Visualization
    flow_vis = draw_flow(frame2, flow)
    hsv_vis = flow_to_hsv(flow)

    cv2.imshow('Flow Vectors', flow_vis)
    cv2.imshow('Flow HSV', hsv_vis)

    if cv2.waitKey(30) & 0xFF == ord('q'):
        break

    prvs = next_gray

cap.release()
cv2.destroyAllWindows()

7. Object Tracking

OpenCV Built-in Trackers

import cv2

# Tracker types
TRACKERS = {
    'BOOSTING': cv2.legacy.TrackerBoosting_create,
    'MIL': cv2.TrackerMIL_create,
    'KCF': cv2.TrackerKCF_create,
    'CSRT': cv2.TrackerCSRT_create,
    'MOSSE': cv2.legacy.TrackerMOSSE_create
}

def track_object(video_path, tracker_type='CSRT'):
    """Single object tracking"""

    # Create tracker
    tracker = TRACKERS[tracker_type]()

    cap = cv2.VideoCapture(video_path)
    ret, frame = cap.read()

    # Select object to track (mouse drag)
    bbox = cv2.selectROI('Select Object', frame, False)
    cv2.destroyWindow('Select Object')

    # Initialize tracker
    tracker.init(frame, bbox)

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # Update tracking
        success, bbox = tracker.update(frame)

        if success:
            x, y, w, h = [int(v) for v in bbox]
            cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
            cv2.putText(frame, tracker_type, (x, y-10),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        else:
            cv2.putText(frame, 'Tracking Failed', (100, 80),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

        cv2.imshow('Tracking', frame)

        if cv2.waitKey(30) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

# Usage example
track_object('video.mp4', 'CSRT')

Multi-object Tracking

import cv2

class MultiObjectTracker:
    """Multi-object tracker"""

    def __init__(self, tracker_type='CSRT'):
        self.tracker_type = tracker_type
        self.trackers = []
        self.colors = []

    def add_tracker(self, frame, bbox):
        """Add new tracker"""
        tracker = cv2.TrackerCSRT_create()
        tracker.init(frame, bbox)
        self.trackers.append(tracker)
        self.colors.append((
            np.random.randint(0, 255),
            np.random.randint(0, 255),
            np.random.randint(0, 255)
        ))

    def update(self, frame):
        """Update all trackers"""
        results = []

        for i, tracker in enumerate(self.trackers):
            success, bbox = tracker.update(frame)
            if success:
                results.append({
                    'id': i,
                    'bbox': bbox,
                    'color': self.colors[i]
                })

        return results

    def draw(self, frame, results):
        """Visualize results"""
        for r in results:
            x, y, w, h = [int(v) for v in r['bbox']]
            cv2.rectangle(frame, (x, y), (x+w, y+h), r['color'], 2)
            cv2.putText(frame, f"ID: {r['id']}", (x, y-10),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, r['color'], 2)
        return frame

# Usage example
import numpy as np

cap = cv2.VideoCapture(0)
multi_tracker = MultiObjectTracker()

ret, frame = cap.read()

# Select multiple objects (ESC to finish)
while True:
    bbox = cv2.selectROI('Select Objects (Press ESC when done)', frame, False)
    if bbox == (0, 0, 0, 0):  # ESC pressed
        break
    multi_tracker.add_tracker(frame, bbox)

cv2.destroyWindow('Select Objects (Press ESC when done)')

while True:
    ret, frame = cap.read()
    if not ret:
        break

    results = multi_tracker.update(frame)
    frame = multi_tracker.draw(frame, results)

    cv2.imshow('Multi Tracking', frame)
    if cv2.waitKey(30) & 0xFF == ord('q'):
        break

cap.release()

Background Subtraction + Tracking Combined

import cv2
import numpy as np

class MotionTracker:
    """Background subtraction-based motion tracking"""

    def __init__(self):
        self.bg_subtractor = cv2.createBackgroundSubtractorMOG2(
            history=500, varThreshold=16, detectShadows=True
        )
        self.kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
        self.tracks = {}  # {id: {'centroid': (x,y), 'frames': count}}
        self.next_id = 0
        self.max_distance = 50  # Distance for same object judgment

    def process(self, frame):
        """Process frame"""
        # Background subtraction
        fg_mask = self.bg_subtractor.apply(frame)
        fg_mask = cv2.threshold(fg_mask, 200, 255, cv2.THRESH_BINARY)[1]

        # Remove noise
        fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, self.kernel)
        fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, self.kernel)

        # Contour detection
        contours, _ = cv2.findContours(fg_mask, cv2.RETR_EXTERNAL,
                                        cv2.CHAIN_APPROX_SIMPLE)

        # Current frame's objects
        current_objects = []
        for contour in contours:
            area = cv2.contourArea(contour)
            if area > 500:
                x, y, w, h = cv2.boundingRect(contour)
                centroid = (x + w//2, y + h//2)
                current_objects.append({
                    'centroid': centroid,
                    'bbox': (x, y, w, h)
                })

        # Match with existing tracks
        self._match_tracks(current_objects)

        return fg_mask, current_objects

    def _match_tracks(self, current_objects):
        """Match current objects with existing tracks"""
        matched = set()

        for obj in current_objects:
            cx, cy = obj['centroid']
            best_match = None
            best_dist = float('inf')

            # Find closest existing track
            for track_id, track in self.tracks.items():
                tx, ty = track['centroid']
                dist = np.sqrt((cx-tx)**2 + (cy-ty)**2)

                if dist < self.max_distance and dist < best_dist:
                    best_dist = dist
                    best_match = track_id

            if best_match is not None:
                # Update existing track
                self.tracks[best_match]['centroid'] = obj['centroid']
                self.tracks[best_match]['bbox'] = obj['bbox']
                self.tracks[best_match]['frames'] += 1
                obj['id'] = best_match
                matched.add(best_match)
            else:
                # Create new track
                obj['id'] = self.next_id
                self.tracks[self.next_id] = {
                    'centroid': obj['centroid'],
                    'bbox': obj['bbox'],
                    'frames': 1
                }
                self.next_id += 1

        # Remove old tracks
        to_remove = [tid for tid in self.tracks if tid not in matched]
        for tid in to_remove:
            if self.tracks[tid]['frames'] < 10:  # Remove short tracks immediately
                del self.tracks[tid]

    def draw(self, frame, objects):
        """Visualize"""
        for obj in objects:
            x, y, w, h = obj['bbox']
            cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)

            if 'id' in obj:
                cv2.putText(frame, f"ID: {obj['id']}", (x, y-10),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

        return frame

# Usage example
cap = cv2.VideoCapture(0)
tracker = MotionTracker()

while True:
    ret, frame = cap.read()
    if not ret:
        break

    mask, objects = tracker.process(frame)
    output = tracker.draw(frame, objects)

    cv2.imshow('Motion Tracking', output)
    cv2.imshow('Mask', mask)

    if cv2.waitKey(30) & 0xFF == ord('q'):
        break

cap.release()

8. Practice Problems

Problem 1: Video Player

Implement a basic video player.

Requirements: - Play/pause toggle (spacebar) - Forward/backward skip (arrow keys) - Frame-by-frame navigation (./,) - Display current time/total time - Progress bar

Hint
# Frame navigation
cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)

# Key handling
key = cv2.waitKey(delay) & 0xFF
if key == ord(' '):  # Spacebar
    paused = not paused
elif key == 83:  # Right arrow
    skip_forward()

Problem 2: Motion Heatmap

Visualize areas with lots of motion as a heatmap.

Requirements: - Detect motion with background subtraction - Generate accumulated motion map - Apply colormap (COLORMAP_JET) - Blend original with heatmap

Hint
# Initialize accumulation map
accumulator = np.zeros((height, width), dtype=np.float32)

# Accumulate per frame
accumulator += fg_mask.astype(np.float32) / 255.0

# Normalize and apply colormap
normalized = cv2.normalize(accumulator, None, 0, 255, cv2.NORM_MINMAX)
heatmap = cv2.applyColorMap(normalized.astype(np.uint8), cv2.COLORMAP_JET)

Problem 3: Speed Measurement

Measure object movement speed using optical flow.

Requirements: - Compute average flow within specific ROI - Convert pixel speed to actual speed (calibration needed) - Display real-time speed graph

Hint
# Average flow in ROI
roi_flow = flow[y:y+h, x:x+w]
avg_flow = np.mean(roi_flow, axis=(0, 1))

# Speed calculation (pixels/frame)
speed = np.sqrt(avg_flow[0]**2 + avg_flow[1]**2)

# Convert to actual speed (e.g., 1 pixel = 1cm, 30fps)
real_speed = speed * pixels_to_cm * fps  # cm/s

Problem 4: Vehicle Counter

Count vehicles passing through in road video.

Requirements: - Detect vehicles with background subtraction - Set virtual line (counting line) - Count objects crossing the line - Distinguish entry/exit direction

Hint
# Define virtual line
line_y = height // 2

# Check if object crossed line
def crossed_line(prev_y, curr_y, line_y):
    # Top to bottom
    if prev_y < line_y and curr_y >= line_y:
        return 'down'
    # Bottom to top
    if prev_y > line_y and curr_y <= line_y:
        return 'up'
    return None

Problem 5: Gesture Recognition

Analyze optical flow patterns to recognize simple gestures (hand waving, drawing circles).

Requirements: - Detect hand region (skin color-based) - Track movement patterns - Classify patterns (rule-based or template matching) - Display recognized gesture

Hint
# Skin color detection (HSV)
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
lower_skin = np.array([0, 20, 70])
upper_skin = np.array([20, 255, 255])
mask = cv2.inRange(hsv, lower_skin, upper_skin)

# Store movement trajectory
trajectory = []
trajectory.append(centroid)

# Trajectory analysis
# Hand waving: oscillation in x direction
# Circle drawing: start and end points close + certain area

Next Steps


References

to navigate between lessons