비디오 처리 (Video Processing)

비디오 처리 (Video Processing)

개요

비디오는 연속된 이미지 프레임의 시퀀스입니다. OpenCV를 사용하여 비디오 파일과 카메라 스트림을 처리하고, 배경 차분과 옵티컬 플로우를 이용한 동작 분석 방법을 학습합니다.

난이도: ⭐⭐⭐

선수 지식: 이미지 기초 연산, 필터링, 객체 검출


목차

  1. VideoCapture: 파일과 카메라
  2. VideoWriter: 비디오 저장
  3. 프레임 단위 처리
  4. FPS 계산
  5. 배경 차분 (MOG2, KNN)
  6. 옵티컬 플로우
  7. 객체 추적
  8. 연습 문제

1. VideoCapture: 파일과 카메라

비디오 구조 이해

비디오 = 연속된 이미지 프레임

시간 ──────────────────────────────────────────▶
    ┌─────┐┌─────┐┌─────┐┌─────┐┌─────┐
    │Frame││Frame││Frame││Frame││Frame│ ...
    │  1  ││  2  ││  3  ││  4  ││  5  │
    └─────┘└─────┘└─────┘└─────┘└─────┘

FPS (Frames Per Second): 초당 프레임 수
- 24 FPS: 영화 표준
- 30 FPS: 일반 비디오
- 60 FPS: 게임, 스포츠
- 120+ FPS: 슬로모션

해상도: 각 프레임의 크기
- 640x480: VGA
- 1280x720: HD (720p)
- 1920x1080: Full HD (1080p)
- 3840x2160: 4K

비디오 파일 읽기

import cv2

# 비디오 파일 열기
cap = cv2.VideoCapture('video.mp4')

# 열기 성공 확인
if not cap.isOpened():
    print("비디오를 열 수 없습니다")
    exit()

# 비디오 속성 가져오기
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = frame_count / fps

print(f"해상도: {width}x{height}")
print(f"FPS: {fps}")
print(f"총 프레임: {frame_count}")
print(f"재생 시간: {duration:.2f}초")

# 프레임 읽기 루프
while True:
    ret, frame = cap.read()

    if not ret:
        print("비디오 끝 또는 에러")
        break

    # 프레임 처리
    cv2.imshow('Video', frame)

    # 'q' 키로 종료, 1ms 대기
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# 자원 해제
cap.release()
cv2.destroyAllWindows()

카메라 입력

import cv2

# 카메라 열기 (장치 ID: 0=기본 카메라)
cap = cv2.VideoCapture(0)

# 카메라 열기 실패 시
if not cap.isOpened():
    print("카메라를 열 수 없습니다")
    exit()

# 카메라 속성 설정
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
cap.set(cv2.CAP_PROP_FPS, 30)

# 버퍼 크기 설정 (지연 감소)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)

print(f"카메라 해상도: {int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))}x"
      f"{int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))}")

while True:
    ret, frame = cap.read()

    if not ret:
        continue

    # 좌우 반전 (거울 효과)
    frame = cv2.flip(frame, 1)

    cv2.imshow('Camera', frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

주요 VideoCapture 속성

import cv2

cap = cv2.VideoCapture('video.mp4')

# 읽기 속성
properties = {
    'CAP_PROP_FRAME_WIDTH': cv2.CAP_PROP_FRAME_WIDTH,    # 프레임 너비
    'CAP_PROP_FRAME_HEIGHT': cv2.CAP_PROP_FRAME_HEIGHT,  # 프레임 높이
    'CAP_PROP_FPS': cv2.CAP_PROP_FPS,                    # FPS
    'CAP_PROP_FRAME_COUNT': cv2.CAP_PROP_FRAME_COUNT,    # 총 프레임 수
    'CAP_PROP_POS_FRAMES': cv2.CAP_PROP_POS_FRAMES,      # 현재 프레임 위치
    'CAP_PROP_POS_MSEC': cv2.CAP_PROP_POS_MSEC,          # 현재 위치 (밀리초)
    'CAP_PROP_FOURCC': cv2.CAP_PROP_FOURCC,              # 코덱 4문자 코드
    'CAP_PROP_BRIGHTNESS': cv2.CAP_PROP_BRIGHTNESS,      # 밝기 (카메라)
    'CAP_PROP_CONTRAST': cv2.CAP_PROP_CONTRAST,          # 대비 (카메라)
}

for name, prop in properties.items():
    value = cap.get(prop)
    print(f"{name}: {value}")

# 특정 프레임으로 이동
cap.set(cv2.CAP_PROP_POS_FRAMES, 100)  # 100번째 프레임으로

# 특정 시간으로 이동 (밀리초)
cap.set(cv2.CAP_PROP_POS_MSEC, 5000)  # 5초 위치로

cap.release()

2. VideoWriter: 비디오 저장

기본 비디오 저장

import cv2

# 비디오 캡처 설정
cap = cv2.VideoCapture(0)

# 비디오 속성
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = 30.0

# 코덱 설정 (4문자 코드)
# 'XVID': AVI 컨테이너용
# 'mp4v': MP4 컨테이너용
# 'MJPG': Motion JPEG
fourcc = cv2.VideoWriter_fourcc(*'mp4v')

# VideoWriter 생성
out = cv2.VideoWriter('output.mp4', fourcc, fps, (width, height))

print("녹화 시작... 'q'를 눌러 종료")

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # 프레임 저장
    out.write(frame)

    # 녹화 표시
    cv2.circle(frame, (30, 30), 10, (0, 0, 255), -1)  # 빨간 원
    cv2.putText(frame, 'REC', (50, 40),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

    cv2.imshow('Recording', frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# 자원 해제
cap.release()
out.release()
cv2.destroyAllWindows()
print("녹화 완료: output.mp4")

주요 코덱

┌─────────────┬─────────────┬────────────────────────┐
│   코덱      │  컨테이너   │         특징           │
├─────────────┼─────────────┼────────────────────────┤
│ 'XVID'      │ .avi        │ 널리 지원, 적당한 압축 │
│ 'MJPG'      │ .avi        │ Motion JPEG, 빠름      │
│ 'mp4v'      │ .mp4        │ MPEG-4, 호환성 좋음    │
│ 'avc1'      │ .mp4        │ H.264, 고압축률        │
│ 'X264'      │ .mp4        │ H.264 (요구사항 있음)  │
│ 'VP80'      │ .webm       │ VP8, 웹용              │
│ 'VP90'      │ .webm       │ VP9, 고효율            │
└─────────────┴─────────────┴────────────────────────┘

# 코덱 테스트
def test_codec(codec_str, extension):
    fourcc = cv2.VideoWriter_fourcc(*codec_str)
    out = cv2.VideoWriter(f'test.{extension}', fourcc, 30, (640, 480))
    if out.isOpened():
        print(f"{codec_str}: 지원됨")
        out.release()
        return True
    else:
        print(f"{codec_str}: 지원 안 됨")
        return False

처리된 비디오 저장

import cv2

def process_and_save_video(input_path, output_path, process_func):
    """비디오 처리 후 저장"""

    cap = cv2.VideoCapture(input_path)

    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    frame_num = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # 프레임 처리
        processed = process_func(frame)

        # 저장
        out.write(processed)

        # 진행률 표시
        frame_num += 1
        progress = (frame_num / total_frames) * 100
        print(f"\r처리 중: {progress:.1f}%", end='')

    print("\n완료!")

    cap.release()
    out.release()

# 사용 예: 그레이스케일 변환 및 엣지 검출
def edge_detection(frame):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    edges = cv2.Canny(gray, 50, 150)
    # 3채널로 변환 (VideoWriter는 컬러 비디오로 설정됨)
    return cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR)

process_and_save_video('input.mp4', 'edges.mp4', edge_detection)

3. 프레임 단위 처리

프레임 처리 파이프라인

프레임 처리 파이프라인:

입력 ──▶ 전처리 ──▶ 분석 ──▶ 후처리 ──▶ 출력
         │          │         │
         ▼          ▼         ▼
      - 리사이즈  - 검출    - 시각화
      - 색 변환  - 추적    - 필터링
      - 노이즈   - 인식    - 합성
        제거

다중 처리 예제

import cv2
import numpy as np

class VideoProcessor:
    """비디오 프레임 처리기"""

    def __init__(self):
        self.processors = []

    def add_processor(self, name, func):
        """처리 함수 추가"""
        self.processors.append((name, func))

    def process_frame(self, frame):
        """모든 처리 함수 적용"""
        result = frame.copy()
        for name, func in self.processors:
            result = func(result)
        return result

    def process_video(self, input_source, output_path=None, display=True):
        """비디오 처리"""
        cap = cv2.VideoCapture(input_source)

        out = None
        if output_path:
            width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
            height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
            fps = cap.get(cv2.CAP_PROP_FPS)
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # 처리
            processed = self.process_frame(frame)

            # 저장
            if out:
                out.write(processed)

            # 표시
            if display:
                cv2.imshow('Processed', processed)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

        cap.release()
        if out:
            out.release()
        cv2.destroyAllWindows()

# 사용 예
processor = VideoProcessor()

# 처리 함수들 추가
processor.add_processor('blur', lambda f: cv2.GaussianBlur(f, (5, 5), 0))
processor.add_processor('edge', lambda f: cv2.Canny(f, 50, 150))

def add_timestamp(frame):
    import datetime
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    cv2.putText(frame, now, (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
    return frame

processor.add_processor('timestamp', add_timestamp)

# 웹캠 처리
processor.process_video(0, output_path='recorded.mp4')

프레임 건너뛰기와 버퍼링

import cv2
import time

def skip_frames_processing(video_path, skip=5):
    """프레임 건너뛰기 (속도 향상)"""

    cap = cv2.VideoCapture(video_path)

    frame_count = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        frame_count += 1

        # skip 프레임마다 처리
        if frame_count % skip != 0:
            continue

        # 무거운 처리 수행
        processed = heavy_processing(frame)

        cv2.imshow('Skipped Processing', processed)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()

def buffered_reading(video_path, buffer_size=10):
    """프레임 버퍼링 (스무스한 재생)"""
    from collections import deque
    from threading import Thread

    cap = cv2.VideoCapture(video_path)
    buffer = deque(maxlen=buffer_size)
    stop_flag = False

    def read_frames():
        while not stop_flag:
            ret, frame = cap.read()
            if not ret:
                break
            if len(buffer) < buffer_size:
                buffer.append(frame)

    # 읽기 스레드 시작
    thread = Thread(target=read_frames)
    thread.start()

    # 초기 버퍼 채우기 대기
    time.sleep(0.5)

    while True:
        if len(buffer) > 0:
            frame = buffer.popleft()
            cv2.imshow('Buffered', frame)

        if cv2.waitKey(30) & 0xFF == ord('q'):
            break

    stop_flag = True
    thread.join()
    cap.release()

4. FPS 계산

FPS 측정 방법

import cv2
import time

class FPSCounter:
    """FPS 측정 클래스"""

    def __init__(self, avg_frames=30):
        self.frame_times = []
        self.avg_frames = avg_frames
        self.last_time = time.time()

    def update(self):
        """프레임 처리 후 호출"""
        current_time = time.time()
        self.frame_times.append(current_time - self.last_time)
        self.last_time = current_time

        # 최근 N개 프레임만 유지
        if len(self.frame_times) > self.avg_frames:
            self.frame_times.pop(0)

    def get_fps(self):
        """현재 FPS 반환"""
        if len(self.frame_times) == 0:
            return 0
        avg_time = sum(self.frame_times) / len(self.frame_times)
        return 1.0 / avg_time if avg_time > 0 else 0

# 사용 예
cap = cv2.VideoCapture(0)
fps_counter = FPSCounter()

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # 프레임 처리
    # ...

    fps_counter.update()
    fps = fps_counter.get_fps()

    # FPS 표시
    cv2.putText(frame, f'FPS: {fps:.1f}', (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

    cv2.imshow('FPS', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()

처리 시간 분석

import cv2
import time

class PerformanceMonitor:
    """성능 모니터링"""

    def __init__(self):
        self.timings = {}

    def start(self, name):
        """타이밍 시작"""
        self.timings[name] = {'start': time.time()}

    def stop(self, name):
        """타이밍 종료"""
        if name in self.timings:
            elapsed = time.time() - self.timings[name]['start']
            self.timings[name]['elapsed'] = elapsed
            return elapsed
        return 0

    def get_report(self):
        """성능 리포트"""
        report = []
        for name, data in self.timings.items():
            if 'elapsed' in data:
                report.append(f"{name}: {data['elapsed']*1000:.2f}ms")
        return '\n'.join(report)

# 사용 예
monitor = PerformanceMonitor()

cap = cv2.VideoCapture(0)

while True:
    # 전체 프레임 시간 측정
    monitor.start('total')

    ret, frame = cap.read()
    if not ret:
        break

    # 전처리 시간 측정
    monitor.start('preprocess')
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    blur = cv2.GaussianBlur(gray, (5, 5), 0)
    monitor.stop('preprocess')

    # 검출 시간 측정
    monitor.start('detection')
    edges = cv2.Canny(blur, 50, 150)
    monitor.stop('detection')

    monitor.stop('total')

    # 성능 표시
    y = 30
    for line in monitor.get_report().split('\n'):
        cv2.putText(frame, line, (10, y),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
        y += 20

    cv2.imshow('Performance', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()

5. 배경 차분 (MOG2, KNN)

배경 차분 원리

배경 차분 (Background Subtraction):
움직이는 전경 객체를 정지된 배경으로부터 분리

┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│ 현재 프레임     │  -  │   배경 모델     │  =  │   전경 마스크   │
│                 │     │                 │     │                 │
│    ┌───┐        │     │                 │     │    ┌───┐        │
│    │ ● │ (사람) │     │   (빈 방)       │     │    │███│        │
│    └───┘        │     │                 │     │    └───┘        │
│                 │     │                 │     │                 │
└─────────────────┘     └─────────────────┘     └─────────────────┘

배경 모델 학습:
- 여러 프레임을 분석하여 배경 통계 학습
- 조명 변화, 그림자 등 처리
- 동적 배경 (나뭇잎 흔들림 등) 대응

MOG2 (Mixture of Gaussians)

import cv2
import numpy as np

# MOG2 배경 차분기 생성
backSub = cv2.createBackgroundSubtractorMOG2(
    history=500,          # 배경 학습에 사용할 프레임 수
    varThreshold=16,      # 픽셀이 배경으로 판단되는 분산 임계값
    detectShadows=True    # 그림자 검출 여부
)

cap = cv2.VideoCapture(0)

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # 배경 차분 적용
    # fgMask: 전경=255, 배경=0, 그림자=127
    fgMask = backSub.apply(frame)

    # 그림자 제거 (127 -> 0)
    fgMask_no_shadow = cv2.threshold(fgMask, 200, 255, cv2.THRESH_BINARY)[1]

    # 노이즈 제거
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
    fgMask_clean = cv2.morphologyEx(fgMask_no_shadow, cv2.MORPH_OPEN, kernel)
    fgMask_clean = cv2.morphologyEx(fgMask_clean, cv2.MORPH_CLOSE, kernel)

    # 전경 추출
    foreground = cv2.bitwise_and(frame, frame, mask=fgMask_clean)

    # 결과 표시
    cv2.imshow('Original', frame)
    cv2.imshow('FG Mask', fgMask)
    cv2.imshow('Cleaned Mask', fgMask_clean)
    cv2.imshow('Foreground', foreground)

    if cv2.waitKey(30) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

KNN 배경 차분

import cv2

# KNN 배경 차분기 생성
backSub = cv2.createBackgroundSubtractorKNN(
    history=500,          # 배경 학습 프레임 수
    dist2Threshold=400.0, # 거리 임계값
    detectShadows=True    # 그림자 검출
)

cap = cv2.VideoCapture('traffic.mp4')

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # 배경 차분
    fgMask = backSub.apply(frame)

    # 노이즈 제거
    fgMask = cv2.medianBlur(fgMask, 5)

    # 윤곽선 검출
    contours, _ = cv2.findContours(fgMask, cv2.RETR_EXTERNAL,
                                    cv2.CHAIN_APPROX_SIMPLE)

    # 움직이는 객체 표시
    for contour in contours:
        area = cv2.contourArea(contour)
        if area > 500:  # 최소 면적 필터
            x, y, w, h = cv2.boundingRect(contour)
            cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)

    cv2.imshow('Motion Detection', frame)
    cv2.imshow('Mask', fgMask)

    if cv2.waitKey(30) & 0xFF == ord('q'):
        break

cap.release()

MOG2 vs KNN 비교

┌────────────────┬──────────────────────┬──────────────────────┐
│     항목       │        MOG2          │        KNN           │
├────────────────┼──────────────────────┼──────────────────────┤
│ 알고리즘       │ 가우시안 혼합 모델   │ K-최근접 이웃        │
│ 속도           │ 빠름                 │ 중간                 │
│ 메모리         │ 적음                 │ 많음                 │
│ 동적 배경      │ 보통                 │ 좋음                 │
│ 조명 변화      │ 보통                 │ 좋음                 │
│ 노이즈         │ 민감                 │ 강건                 │
│ 추천 상황      │ 정적 장면, 실시간    │ 복잡한 장면          │
└────────────────┴──────────────────────┴──────────────────────┘

6. 옵티컬 플로우

옵티컬 플로우 개념

옵티컬 플로우 (Optical Flow):
연속된 프레임 사이의 픽셀 움직임을 추정

프레임 t                    프레임 t+1
┌─────────────────┐        ┌─────────────────┐
│                 │        │                 │
│    ●            │   →    │        ●        │
│                 │        │                 │
└─────────────────┘        └─────────────────┘

속도 벡터 (u, v):
- 픽셀 (x, y)가 다음 프레임에서 (x+u, y+v)로 이동
- I(x, y, t) = I(x+u, y+v, t+1) (밝기 항상성 가정)

종류:
1. Sparse (희소): 특정 점들의 움직임만 계산 (Lucas-Kanade)
2. Dense (밀집): 모든 픽셀의 움직임 계산 (Farneback)

Lucas-Kanade 옵티컬 플로우

import cv2
import numpy as np

# Lucas-Kanade 파라미터
lk_params = dict(
    winSize=(15, 15),      # 검색 윈도우 크기
    maxLevel=2,            # 피라미드 레벨
    criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)
)

# 특징점 검출 파라미터
feature_params = dict(
    maxCorners=100,        # 최대 특징점 수
    qualityLevel=0.3,      # 품질 수준
    minDistance=7,         # 최소 거리
    blockSize=7
)

cap = cv2.VideoCapture(0)

# 첫 프레임 읽기
ret, old_frame = cap.read()
old_gray = cv2.cvtColor(old_frame, cv2.COLOR_BGR2GRAY)

# 특징점 검출
p0 = cv2.goodFeaturesToTrack(old_gray, mask=None, **feature_params)

# 추적 궤적 시각화용
mask = np.zeros_like(old_frame)

# 색상
colors = np.random.randint(0, 255, (100, 3))

while True:
    ret, frame = cap.read()
    if not ret:
        break

    frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    if p0 is not None and len(p0) > 0:
        # 옵티컬 플로우 계산
        p1, st, err = cv2.calcOpticalFlowPyrLK(
            old_gray, frame_gray, p0, None, **lk_params
        )

        if p1 is not None:
            # 좋은 점만 선택
            good_new = p1[st == 1]
            good_old = p0[st == 1]

            # 움직임 시각화
            for i, (new, old) in enumerate(zip(good_new, good_old)):
                a, b = new.ravel().astype(int)
                c, d = old.ravel().astype(int)

                # 궤적 선
                mask = cv2.line(mask, (a, b), (c, d),
                               colors[i % 100].tolist(), 2)
                # 현재 위치 점
                frame = cv2.circle(frame, (a, b), 5,
                                   colors[i % 100].tolist(), -1)

            # 다음 프레임을 위한 업데이트
            p0 = good_new.reshape(-1, 1, 2)

    # 궤적 합성
    img = cv2.add(frame, mask)

    cv2.imshow('Lucas-Kanade', img)

    key = cv2.waitKey(30) & 0xFF
    if key == ord('q'):
        break
    elif key == ord('r'):
        # 'r' 키로 특징점 재검출
        p0 = cv2.goodFeaturesToTrack(frame_gray, mask=None, **feature_params)
        mask = np.zeros_like(frame)

    old_gray = frame_gray.copy()

cap.release()
cv2.destroyAllWindows()

Farneback 밀집 옵티컬 플로우

import cv2
import numpy as np

def draw_flow(img, flow, step=16):
    """플로우 벡터 시각화"""
    h, w = img.shape[:2]
    y, x = np.mgrid[step/2:h:step, step/2:w:step].astype(int)
    fx, fy = flow[y, x].T

    # 선 그리기
    lines = np.vstack([x, y, x+fx, y+fy]).T.reshape(-1, 2, 2)
    lines = np.int32(lines + 0.5)

    vis = img.copy()
    cv2.polylines(vis, lines, 0, (0, 255, 0))

    for (x1, y1), (x2, y2) in lines:
        cv2.circle(vis, (x1, y1), 2, (0, 255, 0), -1)

    return vis

def flow_to_hsv(flow):
    """플로우를 HSV 색상으로 변환"""
    mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])

    hsv = np.zeros((*flow.shape[:2], 3), dtype=np.uint8)
    hsv[..., 0] = ang * 180 / np.pi / 2  # 방향 -> Hue
    hsv[..., 1] = 255  # Saturation
    hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)  # 크기 -> Value

    return cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)

cap = cv2.VideoCapture(0)

ret, frame1 = cap.read()
prvs = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)

while True:
    ret, frame2 = cap.read()
    if not ret:
        break

    next_gray = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)

    # Farneback 옵티컬 플로우
    flow = cv2.calcOpticalFlowFarneback(
        prvs, next_gray,
        None,           # 초기 플로우
        pyr_scale=0.5,  # 피라미드 스케일
        levels=3,       # 피라미드 레벨
        winsize=15,     # 윈도우 크기
        iterations=3,   # 반복 횟수
        poly_n=5,       # 다항식 크기
        poly_sigma=1.2, # 가우시안 시그마
        flags=0
    )

    # 시각화
    flow_vis = draw_flow(frame2, flow)
    hsv_vis = flow_to_hsv(flow)

    cv2.imshow('Flow Vectors', flow_vis)
    cv2.imshow('Flow HSV', hsv_vis)

    if cv2.waitKey(30) & 0xFF == ord('q'):
        break

    prvs = next_gray

cap.release()
cv2.destroyAllWindows()

7. 객체 추적

OpenCV 내장 트래커

import cv2

# 트래커 종류
TRACKERS = {
    'BOOSTING': cv2.legacy.TrackerBoosting_create,
    'MIL': cv2.TrackerMIL_create,
    'KCF': cv2.TrackerKCF_create,
    'CSRT': cv2.TrackerCSRT_create,
    'MOSSE': cv2.legacy.TrackerMOSSE_create
}

def track_object(video_path, tracker_type='CSRT'):
    """단일 객체 추적"""

    # 트래커 생성
    tracker = TRACKERS[tracker_type]()

    cap = cv2.VideoCapture(video_path)
    ret, frame = cap.read()

    # 추적할 객체 선택 (마우스 드래그)
    bbox = cv2.selectROI('Select Object', frame, False)
    cv2.destroyWindow('Select Object')

    # 트래커 초기화
    tracker.init(frame, bbox)

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # 추적 업데이트
        success, bbox = tracker.update(frame)

        if success:
            x, y, w, h = [int(v) for v in bbox]
            cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
            cv2.putText(frame, tracker_type, (x, y-10),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        else:
            cv2.putText(frame, 'Tracking Failed', (100, 80),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

        cv2.imshow('Tracking', frame)

        if cv2.waitKey(30) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

# 사용 예
track_object('video.mp4', 'CSRT')

다중 객체 추적

import cv2

class MultiObjectTracker:
    """다중 객체 추적기"""

    def __init__(self, tracker_type='CSRT'):
        self.tracker_type = tracker_type
        self.trackers = []
        self.colors = []

    def add_tracker(self, frame, bbox):
        """새 트래커 추가"""
        tracker = cv2.TrackerCSRT_create()
        tracker.init(frame, bbox)
        self.trackers.append(tracker)
        self.colors.append((
            np.random.randint(0, 255),
            np.random.randint(0, 255),
            np.random.randint(0, 255)
        ))

    def update(self, frame):
        """모든 트래커 업데이트"""
        results = []

        for i, tracker in enumerate(self.trackers):
            success, bbox = tracker.update(frame)
            if success:
                results.append({
                    'id': i,
                    'bbox': bbox,
                    'color': self.colors[i]
                })

        return results

    def draw(self, frame, results):
        """결과 시각화"""
        for r in results:
            x, y, w, h = [int(v) for v in r['bbox']]
            cv2.rectangle(frame, (x, y), (x+w, y+h), r['color'], 2)
            cv2.putText(frame, f"ID: {r['id']}", (x, y-10),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, r['color'], 2)
        return frame

# 사용 예
import numpy as np

cap = cv2.VideoCapture(0)
multi_tracker = MultiObjectTracker()

ret, frame = cap.read()

# 여러 객체 선택 (ESC로 종료)
while True:
    bbox = cv2.selectROI('Select Objects (Press ESC when done)', frame, False)
    if bbox == (0, 0, 0, 0):  # ESC 누름
        break
    multi_tracker.add_tracker(frame, bbox)

cv2.destroyWindow('Select Objects (Press ESC when done)')

while True:
    ret, frame = cap.read()
    if not ret:
        break

    results = multi_tracker.update(frame)
    frame = multi_tracker.draw(frame, results)

    cv2.imshow('Multi Tracking', frame)
    if cv2.waitKey(30) & 0xFF == ord('q'):
        break

cap.release()

배경 차분 + 추적 결합

import cv2
import numpy as np

class MotionTracker:
    """배경 차분 기반 움직임 추적"""

    def __init__(self):
        self.bg_subtractor = cv2.createBackgroundSubtractorMOG2(
            history=500, varThreshold=16, detectShadows=True
        )
        self.kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
        self.tracks = {}  # {id: {'centroid': (x,y), 'frames': count}}
        self.next_id = 0
        self.max_distance = 50  # 동일 객체 판단 거리

    def process(self, frame):
        """프레임 처리"""
        # 배경 차분
        fg_mask = self.bg_subtractor.apply(frame)
        fg_mask = cv2.threshold(fg_mask, 200, 255, cv2.THRESH_BINARY)[1]

        # 노이즈 제거
        fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, self.kernel)
        fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, self.kernel)

        # 윤곽선 검출
        contours, _ = cv2.findContours(fg_mask, cv2.RETR_EXTERNAL,
                                        cv2.CHAIN_APPROX_SIMPLE)

        # 현재 프레임의 객체들
        current_objects = []
        for contour in contours:
            area = cv2.contourArea(contour)
            if area > 500:
                x, y, w, h = cv2.boundingRect(contour)
                centroid = (x + w//2, y + h//2)
                current_objects.append({
                    'centroid': centroid,
                    'bbox': (x, y, w, h)
                })

        # 기존 트랙과 매칭
        self._match_tracks(current_objects)

        return fg_mask, current_objects

    def _match_tracks(self, current_objects):
        """현재 객체와 기존 트랙 매칭"""
        matched = set()

        for obj in current_objects:
            cx, cy = obj['centroid']
            best_match = None
            best_dist = float('inf')

            # 가장 가까운 기존 트랙 찾기
            for track_id, track in self.tracks.items():
                tx, ty = track['centroid']
                dist = np.sqrt((cx-tx)**2 + (cy-ty)**2)

                if dist < self.max_distance and dist < best_dist:
                    best_dist = dist
                    best_match = track_id

            if best_match is not None:
                # 기존 트랙 업데이트
                self.tracks[best_match]['centroid'] = obj['centroid']
                self.tracks[best_match]['bbox'] = obj['bbox']
                self.tracks[best_match]['frames'] += 1
                obj['id'] = best_match
                matched.add(best_match)
            else:
                # 새 트랙 생성
                obj['id'] = self.next_id
                self.tracks[self.next_id] = {
                    'centroid': obj['centroid'],
                    'bbox': obj['bbox'],
                    'frames': 1
                }
                self.next_id += 1

        # 오래된 트랙 제거
        to_remove = [tid for tid in self.tracks if tid not in matched]
        for tid in to_remove:
            if self.tracks[tid]['frames'] < 10:  # 짧은 트랙은 바로 제거
                del self.tracks[tid]

    def draw(self, frame, objects):
        """시각화"""
        for obj in objects:
            x, y, w, h = obj['bbox']
            cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)

            if 'id' in obj:
                cv2.putText(frame, f"ID: {obj['id']}", (x, y-10),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

        return frame

# 사용 예
cap = cv2.VideoCapture(0)
tracker = MotionTracker()

while True:
    ret, frame = cap.read()
    if not ret:
        break

    mask, objects = tracker.process(frame)
    output = tracker.draw(frame, objects)

    cv2.imshow('Motion Tracking', output)
    cv2.imshow('Mask', mask)

    if cv2.waitKey(30) & 0xFF == ord('q'):
        break

cap.release()

8. 연습 문제

문제 1: 비디오 플레이어

기본적인 비디오 플레이어를 구현하세요.

요구사항: - 재생/일시정지 토글 (스페이스바) - 앞으로/뒤로 건너뛰기 (방향키) - 프레임 단위 이동 (./,) - 현재 시간/총 시간 표시 - 프로그레스 바

힌트
# 프레임 이동
cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)

# 키 처리
key = cv2.waitKey(delay) & 0xFF
if key == ord(' '):  # 스페이스바
    paused = not paused
elif key == 83:  # 오른쪽 화살표
    skip_forward()

문제 2: 움직임 히트맵

비디오에서 움직임이 많은 영역을 히트맵으로 시각화하세요.

요구사항: - 배경 차분으로 움직임 검출 - 누적 움직임 맵 생성 - 컬러맵 적용 (COLORMAP_JET) - 원본과 히트맵 블렌딩

힌트
# 누적 맵 초기화
accumulator = np.zeros((height, width), dtype=np.float32)

# 프레임마다 누적
accumulator += fg_mask.astype(np.float32) / 255.0

# 정규화 및 컬러맵 적용
normalized = cv2.normalize(accumulator, None, 0, 255, cv2.NORM_MINMAX)
heatmap = cv2.applyColorMap(normalized.astype(np.uint8), cv2.COLORMAP_JET)

문제 3: 속도 측정

옵티컬 플로우를 이용해 객체의 이동 속도를 측정하세요.

요구사항: - 특정 ROI 내 평균 플로우 계산 - 픽셀 속도를 실제 속도로 변환 (캘리브레이션 필요) - 속도 그래프 실시간 표시

힌트
# ROI 내 평균 플로우
roi_flow = flow[y:y+h, x:x+w]
avg_flow = np.mean(roi_flow, axis=(0, 1))

# 속도 계산 (픽셀/프레임)
speed = np.sqrt(avg_flow[0]**2 + avg_flow[1]**2)

# 실제 속도 변환 (예: 1픽셀 = 1cm, 30fps)
real_speed = speed * pixels_to_cm * fps  # cm/s

문제 4: 차량 계수기

도로 비디오에서 통과하는 차량을 계수하세요.

요구사항: - 배경 차분으로 차량 검출 - 가상 선 설정 (계수 라인) - 선을 통과하는 객체 계수 - 진입/퇴장 방향 구분

힌트
# 가상 선 정의
line_y = height // 2

# 객체가 선을 통과했는지 확인
def crossed_line(prev_y, curr_y, line_y):
    # 위에서 아래로
    if prev_y < line_y and curr_y >= line_y:
        return 'down'
    # 아래에서 위로
    if prev_y > line_y and curr_y <= line_y:
        return 'up'
    return None

문제 5: 동작 인식

옵티컬 플로우 패턴을 분석하여 간단한 동작(손 흔들기, 원 그리기)을 인식하세요.

요구사항: - 손 영역 검출 (피부색 기반) - 움직임 패턴 추적 - 패턴 분류 (규칙 기반 또는 템플릿 매칭) - 인식된 동작 표시

힌트
# 피부색 검출 (HSV)
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
lower_skin = np.array([0, 20, 70])
upper_skin = np.array([20, 255, 255])
mask = cv2.inRange(hsv, lower_skin, upper_skin)

# 움직임 궤적 저장
trajectory = []
trajectory.append(centroid)

# 궤적 분석
# 손 흔들기: x 방향 진동
# 원 그리기: 시작점과 끝점이 가까움 + 일정 면적

다음 단계


참고 자료

to navigate between lessons