06_fft_analysis.py

Download
python 394 lines 13.3 KB
  1#!/usr/bin/env python3
  2"""
  3Discrete Fourier Transform (DFT) and Fast Fourier Transform (FFT)
  4=================================================================
  5
  6The Discrete Fourier Transform of an N-point sequence x[n] is:
  7
  8    X[k] = sum_{n=0}^{N-1}  x[n] * exp(-j*2*pi*k*n/N),   k = 0, 1, ..., N-1
  9
 10The FFT is an efficient algorithm to compute the DFT in O(N log N) operations
 11instead of O(N^2) for the naive matrix-multiplication form.
 12
 13Topics Covered:
 14    1. DFT matrix multiplication vs. np.fft.fft (correctness check)
 15    2. Zero-padding: increase apparent frequency resolution
 16    3. Windowing: reduce spectral leakage (rect / Hanning / Hamming / Blackman)
 17    4. Multi-tone signal analysis with FFT
 18    5. Spectral leakage and the effect of different windows
 19    6. FFT vs. DFT computation-time comparison
 20
 21Author: Educational example for Signal Processing
 22License: MIT
 23"""
 24
 25import time
 26import numpy as np
 27import matplotlib.pyplot as plt
 28from matplotlib.gridspec import GridSpec
 29
 30
 31# =============================================================================
 32# 1. DFT via matrix multiplication (O(N^2) — for illustration only)
 33# =============================================================================
 34
 35def dft_matrix(N):
 36    """
 37    Build the N×N DFT matrix W where W[k,n] = exp(-j*2*pi*k*n/N).
 38
 39    X = W @ x  gives the DFT of x.
 40
 41    Args:
 42        N (int): DFT size
 43
 44    Returns:
 45        ndarray: Complex N×N DFT matrix
 46    """
 47    n = np.arange(N)
 48    k = n.reshape((N, 1))    # column vector of output indices
 49    # W[k,n] = exp(-j*2*pi*k*n/N)
 50    W = np.exp(-1j * 2 * np.pi * k * n / N)
 51    return W
 52
 53
 54def dft_slow(x):
 55    """
 56    Compute DFT using explicit matrix multiplication (O(N^2)).
 57
 58    Args:
 59        x (ndarray): Input sequence of length N
 60
 61    Returns:
 62        ndarray: DFT coefficients X[k]
 63    """
 64    N = len(x)
 65    W = dft_matrix(N)
 66    return W @ x
 67
 68
 69# =============================================================================
 70# 2. Window functions
 71# =============================================================================
 72
 73WINDOWS = {
 74    'Rectangular': lambda N: np.ones(N),
 75    'Hanning':     lambda N: np.hanning(N),
 76    'Hamming':     lambda N: np.hamming(N),
 77    'Blackman':    lambda N: np.blackman(N),
 78}
 79
 80
 81def apply_window(x, window_name):
 82    """
 83    Apply a named window function to signal x.
 84
 85    Args:
 86        x           (ndarray): Input signal
 87        window_name (str):     One of 'Rectangular', 'Hanning', 'Hamming', 'Blackman'
 88
 89    Returns:
 90        ndarray: Windowed signal
 91    """
 92    w = WINDOWS[window_name](len(x))
 93    return x * w
 94
 95
 96# =============================================================================
 97# 3. Demonstration functions
 98# =============================================================================
 99
100def demo_dft_vs_fft():
101    """
102    Verify that DFT matrix multiplication and np.fft.fft give identical results.
103
104    Also compare computation times on increasing signal lengths.
105    """
106    print("=" * 60)
107    print("Demo 1: DFT (Matrix) vs. FFT (numpy) — Correctness & Speed")
108    print("=" * 60)
109
110    # Correctness check on a small signal
111    N = 32
112    np.random.seed(42)
113    x = np.random.randn(N) + 1j * np.random.randn(N)
114
115    X_dft = dft_slow(x)
116    X_fft = np.fft.fft(x)
117    max_error = np.max(np.abs(X_dft - X_fft))
118    print(f"  N={N}: max |DFT - FFT| = {max_error:.2e}  (should be ~machine epsilon)")
119
120    # Timing comparison
121    sizes = [64, 128, 256, 512, 1024, 2048]
122    times_dft = []
123    times_fft = []
124
125    for N in sizes:
126        x = np.random.randn(N)
127        # Time DFT (matrix multiply)
128        reps = max(1, 200 // N)
129        t0 = time.perf_counter()
130        for _ in range(reps):
131            dft_slow(x)
132        times_dft.append((time.perf_counter() - t0) / reps * 1000)
133        # Time FFT
134        reps_fft = 10000
135        t0 = time.perf_counter()
136        for _ in range(reps_fft):
137            np.fft.fft(x)
138        times_fft.append((time.perf_counter() - t0) / reps_fft * 1000)
139
140    print(f"\n  {'N':>6}  {'DFT (ms)':>12}  {'FFT (ms)':>12}  {'Speedup':>10}")
141    print(f"  {'-'*46}")
142    for N, t_d, t_f in zip(sizes, times_dft, times_fft):
143        print(f"  {N:>6}  {t_d:>12.4f}  {t_f:>12.6f}  {t_d/t_f:>9.1f}x")
144
145    # Plot timing
146    fig, ax = plt.subplots(figsize=(8, 5))
147    ax.loglog(sizes, times_dft, 'ro-', label='DFT O(N²)')
148    ax.loglog(sizes, times_fft, 'bs-', label='FFT O(N log N)')
149    # Reference curves
150    n_arr = np.array(sizes, dtype=float)
151    scale_dft = times_dft[0] / (sizes[0] ** 2)
152    scale_fft = times_fft[0] / (sizes[0] * np.log2(sizes[0]))
153    ax.loglog(n_arr, scale_dft * n_arr ** 2, 'r--', alpha=0.5, label='∝ N²')
154    ax.loglog(n_arr, scale_fft * n_arr * np.log2(n_arr), 'b--', alpha=0.5, label='∝ N log N')
155    ax.set_xlabel("Signal length N")
156    ax.set_ylabel("Time (ms)")
157    ax.set_title("DFT (matrix) vs. FFT Computation Time")
158    ax.legend()
159    ax.grid(True, which='both', alpha=0.3)
160    plt.tight_layout()
161    plt.savefig('/tmp/06_dft_vs_fft_timing.png', dpi=150)
162    print("\n  Saved timing plot to /tmp/06_dft_vs_fft_timing.png")
163    plt.show()
164
165
166def demo_zero_padding():
167    """
168    Show how zero-padding interpolates the DFT spectrum, giving finer
169    frequency resolution in the displayed spectrum.
170
171    IMPORTANT: Zero-padding does NOT add new information — it only
172    interpolates the existing DFT bins, making peaks easier to locate.
173    """
174    print("\n" + "=" * 60)
175    print("Demo 2: Zero-Padding for Frequency Resolution")
176    print("=" * 60)
177
178    fs = 100.0    # Hz
179    N = 64        # original number of samples
180    t = np.arange(N) / fs
181
182    # Signal: two closely spaced tones
183    f1, f2 = 10.0, 13.5   # Hz
184    x = np.sin(2 * np.pi * f1 * t) + np.sin(2 * np.pi * f2 * t)
185
186    zero_pad_lengths = [N, 2*N, 4*N, 16*N]
187    labels = [f'N={n}{n//N})' for n in zero_pad_lengths]
188
189    print(f"  Signal: {f1} Hz + {f2} Hz, N={N} samples, fs={fs} Hz")
190    print(f"  Frequency resolution without padding: Δf = {fs/N:.2f} Hz")
191
192    fig, axes = plt.subplots(2, 2, figsize=(12, 8))
193    fig.suptitle("Zero-Padding: Interpolating the DFT Spectrum", fontsize=14, fontweight='bold')
194
195    for ax, nfft, label in zip(axes.flat, zero_pad_lengths, labels):
196        # Pad signal to nfft points
197        x_padded = np.zeros(nfft)
198        x_padded[:N] = x
199        X = np.fft.rfft(x_padded)
200        f = np.fft.rfftfreq(nfft, d=1.0 / fs)
201        mag = (2.0 / N) * np.abs(X)   # normalize by original N
202
203        df = fs / nfft
204        ax.plot(f, mag, label=f'Δf = {df:.2f} Hz')
205        ax.axvline(f1, color='red',  linestyle='--', alpha=0.6, label=f'{f1} Hz')
206        ax.axvline(f2, color='blue', linestyle='--', alpha=0.6, label=f'{f2} Hz')
207        ax.set_xlim(0, 30)
208        ax.set_title(f"NFFT = {nfft} ({label})")
209        ax.set_xlabel("Frequency (Hz)")
210        ax.set_ylabel("Amplitude")
211        ax.legend(fontsize=8)
212        ax.grid(True, alpha=0.3)
213
214    plt.tight_layout()
215    plt.savefig('/tmp/06_zero_padding.png', dpi=150)
216    print("  Saved plot to /tmp/06_zero_padding.png")
217    plt.show()
218
219
220def demo_windowing_and_leakage():
221    """
222    Demonstrate spectral leakage and how windowing reduces it.
223
224    Spectral leakage occurs when a non-integer number of cycles fits in
225    the analysis window, causing energy to spread across many bins.
226
227    Windows trade frequency resolution for reduced sidelobe level.
228    """
229    print("\n" + "=" * 60)
230    print("Demo 3: Spectral Leakage and Window Functions")
231    print("=" * 60)
232
233    fs = 100.0
234    N = 128
235    t = np.arange(N) / fs
236
237    # Signal: strong tone at 10 Hz + weak tone at 18.7 Hz (non-integer bins)
238    # The non-integer frequency causes severe leakage with rectangular window
239    f_strong, f_weak = 10.0, 18.7
240    A_strong, A_weak = 1.0, 0.02
241    x = A_strong * np.sin(2 * np.pi * f_strong * t) + A_weak * np.sin(2 * np.pi * f_weak * t)
242
243    print(f"  Signal: {A_strong}·sin(2π·{f_strong}·t) + {A_weak}·sin(2π·{f_weak}·t)")
244    print(f"  Goal: detect weak {f_weak} Hz tone buried under strong {f_strong} Hz leakage\n")
245
246    nfft = 4 * N   # zero-pad for display resolution
247    f = np.fft.rfftfreq(nfft, d=1.0 / fs)
248
249    colors = ['b', 'g', 'r', 'purple']
250    fig, axes = plt.subplots(2, 2, figsize=(13, 9))
251    fig.suptitle("Spectral Leakage and Windowing", fontsize=14, fontweight='bold')
252
253    window_names = list(WINDOWS.keys())
254    for ax, wname, color in zip(axes.flat, window_names, colors):
255        x_win = apply_window(x, wname)
256        w = WINDOWS[wname](N)
257
258        # Pad and FFT
259        x_padded = np.zeros(nfft)
260        x_padded[:N] = x_win
261        X = np.fft.rfft(x_padded)
262        # Normalize by sum of window coefficients
263        mag_db = 20 * np.log10(np.abs(X) / (np.sum(w) / 2) + 1e-12)
264
265        ax.plot(f, mag_db, color=color, lw=1.5, label=wname)
266        ax.axvline(f_weak, color='orange', linestyle='--', alpha=0.7,
267                   label=f'Weak tone {f_weak} Hz')
268        ax.axhline(20 * np.log10(A_weak), color='gray', linestyle=':', alpha=0.5,
269                   label=f'True level ({20*np.log10(A_weak):.0f} dB)')
270        ax.set_xlim(0, 40)
271        ax.set_ylim(-80, 5)
272        ax.set_title(wname)
273        ax.set_xlabel("Frequency (Hz)")
274        ax.set_ylabel("Magnitude (dB)")
275        ax.legend(fontsize=8)
276        ax.grid(True, alpha=0.3)
277
278        # Report peak near weak tone
279        mask_weak = (f > 16) & (f < 22)
280        peak_db = np.max(mag_db[mask_weak]) if mask_weak.any() else -80
281        print(f"  {wname:12s}: weak tone peak near {f_weak} Hz = {peak_db:.1f} dB")
282
283    plt.tight_layout()
284    plt.savefig('/tmp/06_windowing.png', dpi=150)
285    print("\n  Saved plot to /tmp/06_windowing.png")
286    plt.show()
287
288
289def demo_multitone_analysis():
290    """
291    Use FFT to identify frequency components in a multi-tone audio-like signal.
292
293    Signal contains 5 tones of varying amplitudes and a noise floor.
294    This simulates a real spectrum analysis task.
295    """
296    print("\n" + "=" * 60)
297    print("Demo 4: Multi-Tone Signal Analysis")
298    print("=" * 60)
299
300    fs = 1000.0
301    duration = 2.0
302    N = int(fs * duration)
303    t = np.arange(N) / fs
304    np.random.seed(7)
305
306    # Define tones: (frequency_Hz, amplitude)
307    tones = [(50, 1.0), (120, 0.5), (230, 0.8), (340, 0.3), (470, 0.6)]
308    x = np.zeros(N)
309    for f_tone, amp in tones:
310        x += amp * np.sin(2 * np.pi * f_tone * t)
311    # Add Gaussian noise (SNR ~ 20 dB)
312    noise_level = 0.1
313    x += noise_level * np.random.randn(N)
314
315    print("  Tones: " + ", ".join(f"{f} Hz ({A})" for f, A in tones))
316    print(f"  Noise level: {noise_level}")
317
318    # Apply Hanning window before FFT
319    x_windowed = apply_window(x, 'Hanning')
320    w = np.hanning(N)
321    X = np.fft.rfft(x_windowed)
322    f = np.fft.rfftfreq(N, d=1.0 / fs)
323    mag = (2.0 / np.sum(w)) * np.abs(X)
324
325    # Find peaks (simple threshold)
326    threshold = 0.15
327    peak_mask = mag > threshold
328    peak_freqs = f[peak_mask]
329    peak_mags = mag[peak_mask]
330    # Keep only local maxima
331    from scipy.signal import find_peaks
332    peaks_idx, _ = find_peaks(mag, height=threshold, distance=20)
333
334    print(f"\n  Detected peaks (threshold = {threshold}):")
335    for idx in peaks_idx:
336        print(f"    f = {f[idx]:.1f} Hz,  amplitude ≈ {mag[idx]:.3f}")
337
338    fig = plt.figure(figsize=(13, 7))
339    gs = GridSpec(2, 1, figure=fig, hspace=0.4)
340    fig.suptitle("Multi-Tone Signal Analysis (Hanning Window)", fontsize=14, fontweight='bold')
341
342    # Time domain
343    ax1 = fig.add_subplot(gs[0])
344    ax1.plot(t[:500], x[:500], lw=0.8, color='steelblue', label='Signal (first 0.5 s)')
345    ax1.set_xlabel("Time (s)")
346    ax1.set_ylabel("Amplitude")
347    ax1.set_title("Time Domain (zoomed)")
348    ax1.legend()
349    ax1.grid(True, alpha=0.3)
350
351    # Frequency domain
352    ax2 = fig.add_subplot(gs[1])
353    ax2.plot(f, mag, color='steelblue', lw=1.2, label='Magnitude spectrum')
354    ax2.plot(f[peaks_idx], mag[peaks_idx], 'rv', markersize=10, label='Detected peaks')
355    for idx in peaks_idx:
356        ax2.annotate(f'{f[idx]:.0f} Hz', xy=(f[idx], mag[idx]),
357                     xytext=(0, 8), textcoords='offset points',
358                     ha='center', fontsize=8, color='red')
359    ax2.axhline(threshold, color='orange', linestyle='--', alpha=0.7, label=f'Threshold={threshold}')
360    ax2.set_xlim(0, fs / 2)
361    ax2.set_xlabel("Frequency (Hz)")
362    ax2.set_ylabel("Amplitude")
363    ax2.set_title("Frequency Domain (FFT with Hanning window)")
364    ax2.legend()
365    ax2.grid(True, alpha=0.3)
366
367    plt.savefig('/tmp/06_multitone.png', dpi=150)
368    print("  Saved plot to /tmp/06_multitone.png")
369    plt.show()
370
371
372if __name__ == "__main__":
373    print("Discrete Fourier Transform (DFT) and FFT Analysis")
374    print("=" * 60)
375    print("DFT:  X[k] = sum_{n=0}^{N-1} x[n] * exp(-j2πkn/N)")
376    print("FFT:  Same result, but O(N log N) instead of O(N²)\n")
377
378    demo_dft_vs_fft()
379    demo_zero_padding()
380    demo_windowing_and_leakage()
381    demo_multitone_analysis()
382
383    print("\n" + "=" * 60)
384    print("Key Takeaways:")
385    print("  - DFT via matrix multiply and FFT give identical results")
386    print("  - FFT speedup grows with N: O(N²) → O(N log N)")
387    print("  - Zero-padding interpolates spectrum (more display bins)")
388    print("    but does NOT improve true frequency resolution (Δf = fs/N_orig)")
389    print("  - Spectral leakage: non-integer-period signals smear energy")
390    print("  - Windows reduce leakage at cost of slightly wider main lobe:")
391    print("      Rectangular < Hanning ≈ Hamming < Blackman (sidelobe suppression)")
392    print("  - Hanning or Hamming are good default choices for spectrum analysis")
393    print("=" * 60)