12_spectral_estimation.py

Download
python 457 lines 17.4 KB
  1#!/usr/bin/env python3
  2"""
  3Spectral Estimation Methods
  4=============================
  5
  6This script compares non-parametric and parametric spectral estimation
  7techniques applied to a signal with known spectral content:
  8
  9Non-parametric methods:
 10    1. Periodogram        — squared magnitude of FFT; high variance, good resolution
 11    2. Bartlett's method  — average of non-overlapping periodograms; reduces variance
 12    3. Welch's method     — overlapping windowed segments; variance vs. resolution tradeoff
 13
 14Parametric methods:
 15    4. AR model (Yule-Walker) — models signal as output of all-pole filter driven by
 16                                white noise; high resolution for short data records
 17
 18Key Concepts:
 19    - Variance and resolution are fundamentally traded off in spectral estimation
 20    - Averaging reduces variance by factor K (segments), but reduces resolution
 21      by same factor (segment length N/K vs full N)
 22    - Windowing reduces spectral leakage at cost of main-lobe width
 23    - AR model can resolve closely-spaced peaks that DFT-based methods blur
 24    - Yule-Walker equations: R * a = -r  (autocorrelation matrix system)
 25
 26Author: Educational example for Signal Processing
 27License: MIT
 28"""
 29
 30import numpy as np
 31from scipy import signal
 32from scipy.linalg import toeplitz
 33import matplotlib.pyplot as plt
 34
 35
 36# ============================================================================
 37# SIGNAL GENERATION
 38# ============================================================================
 39
 40def make_test_signal(fs=1000.0, duration=2.0, seed=42):
 41    """
 42    Construct a test signal with known spectral content:
 43        - Two closely-spaced sinusoids at 100 and 120 Hz
 44        - One isolated sinusoid at 300 Hz
 45        - White Gaussian noise (SNR ≈ 10 dB)
 46
 47    This combination tests each estimator's ability to:
 48      (a) resolve two nearby peaks (resolution test)
 49      (b) correctly locate an isolated peak (frequency accuracy test)
 50
 51    Args:
 52        fs       (float): Sampling rate in Hz
 53        duration (float): Signal length in seconds
 54        seed     (int)  : RNG seed for reproducibility
 55
 56    Returns:
 57        (t, x): time vector and signal ndarray
 58    """
 59    rng = np.random.default_rng(seed)
 60    t = np.arange(0, duration, 1.0 / fs)
 61    N = len(t)
 62
 63    # Sinusoidal components
 64    x = (1.0 * np.sin(2 * np.pi * 100 * t) +
 65         0.8 * np.sin(2 * np.pi * 120 * t) +
 66         0.6 * np.sin(2 * np.pi * 300 * t))
 67
 68    # White noise — std chosen so broadband SNR ≈ 10 dB
 69    noise_std = 0.3
 70    x += noise_std * rng.standard_normal(N)
 71
 72    return t, x
 73
 74
 75# ============================================================================
 76# NON-PARAMETRIC ESTIMATORS
 77# ============================================================================
 78
 79def periodogram(x, fs):
 80    """
 81    Classic periodogram: P(k) = |X(k)|² / N.
 82
 83    The periodogram has the same resolution as the DFT (Δf = fs/N),
 84    but its variance does NOT decrease as N increases — it remains
 85    proportional to the true spectrum squared at each frequency.
 86
 87    Args:
 88        x  (ndarray): Input signal (length N)
 89        fs (float)  : Sampling rate (Hz)
 90
 91    Returns:
 92        (freqs, Pxx): frequency axis and one-sided PSD (V²/Hz)
 93    """
 94    N = len(x)
 95    # No windowing (rectangular window) → highest frequency resolution
 96    X = np.fft.rfft(x) / N
 97    Pxx = 2 * np.abs(X) ** 2 / (fs / N)   # one-sided PSD: multiply by 2 (except DC/Nyq)
 98    Pxx[0] /= 2      # DC bin is not doubled
 99    if N % 2 == 0:
100        Pxx[-1] /= 2  # Nyquist bin (only if N even)
101    freqs = np.fft.rfftfreq(N, d=1.0 / fs)
102    return freqs, Pxx
103
104
105def bartlett(x, fs, K):
106    """
107    Bartlett's method: average K non-overlapping periodograms.
108
109    Divides x into K non-overlapping segments of length L = N//K.
110    Each segment's periodogram is computed (rectangular window),
111    and the K periodograms are averaged.
112
113    Effect:
114        - Variance reduced by factor K  (σ² → σ²/K)
115        - Frequency resolution reduced: Δf = fs/L = fs*K/N  (worse by K)
116
117    Args:
118        x  (ndarray): Input signal
119        fs (float)  : Sampling rate
120        K  (int)    : Number of non-overlapping segments
121
122    Returns:
123        (freqs, Pxx_avg): frequency axis and averaged PSD
124    """
125    N = len(x)
126    L = N // K   # segment length
127
128    freqs = np.fft.rfftfreq(L, d=1.0 / fs)
129    Pxx_sum = np.zeros(len(freqs))
130
131    for k in range(K):
132        seg = x[k * L:(k + 1) * L]
133        _, P_seg = periodogram(seg, fs)
134        # P_seg may differ in length if rfftfreq gives different bins;
135        # take the minimum length to be safe
136        n = min(len(Pxx_sum), len(P_seg))
137        Pxx_sum[:n] += P_seg[:n]
138
139    Pxx_avg = Pxx_sum / K
140    return freqs, Pxx_avg
141
142
143def welch_method(x, fs, nperseg, noverlap=None, window='hann'):
144    """
145    Welch's method: overlapping windowed periodograms averaged.
146
147    Improvements over Bartlett:
148      - Overlapping segments (typically 50 %) increase the number of
149        averages without reducing the segment length as much.
150      - Windowing (e.g. Hann) reduces spectral leakage.
151
152    The resolution-variance tradeoff is parameterized by nperseg:
153      - Larger nperseg → better resolution, higher variance
154      - Smaller nperseg → worse resolution, lower variance
155
156    Uses scipy.signal.welch for a robust, well-tested implementation.
157
158    Args:
159        x        (ndarray): Input signal
160        fs       (float)  : Sampling rate (Hz)
161        nperseg  (int)    : Samples per segment
162        noverlap (int)    : Overlap samples (default: nperseg//2)
163        window   (str)    : Window function name
164
165    Returns:
166        (freqs, Pxx): scipy.signal.welch output
167    """
168    if noverlap is None:
169        noverlap = nperseg // 2
170    freqs, Pxx = signal.welch(x, fs=fs, window=window,
171                               nperseg=nperseg, noverlap=noverlap,
172                               scaling='density')
173    return freqs, Pxx
174
175
176# ============================================================================
177# PARAMETRIC ESTIMATOR: AR / YULE-WALKER
178# ============================================================================
179
180def ar_yule_walker(x, order, fs):
181    """
182    Parametric spectral estimate using an AR(p) model.
183
184    The AR model assumes the signal is the output of an all-pole IIR
185    filter driven by white noise:
186
187        x[n] = -a[1]*x[n-1] - ... - a[p]*x[n-p] + e[n]
188
189    The Yule-Walker equations express this in matrix form:
190        R_xx * a = -r_xx
191    where R_xx is the p×p autocorrelation (Toeplitz) matrix and r_xx
192    is the vector of lags 1…p.
193
194    Once 'a' is found, the PSD estimate is:
195        P(ω) = σ²_e / |A(e^jω)|²
196    where A(e^jω) = 1 + a[1]e^{-jω} + … + a[p]e^{-jpω}.
197
198    This method can resolve closely-spaced peaks far better than DFT-based
199    methods when the model order matches the signal structure.
200
201    Args:
202        x     (ndarray): Input signal
203        order (int)    : AR model order p (number of poles)
204        fs    (float)  : Sampling rate (Hz)
205
206    Returns:
207        (freqs, Pxx): frequency axis and PSD estimate
208    """
209    N = len(x)
210
211    # Estimate autocorrelation lags 0 … order (biased estimator)
212    r = np.array([np.dot(x[:N - k], x[k:]) / N for k in range(order + 1)])
213
214    # Build Toeplitz matrix from lags 0 … order-1
215    R = toeplitz(r[:order])     # p × p symmetric Toeplitz
216    r_vec = r[1:order + 1]      # lags 1 … p
217
218    # Solve Yule-Walker: R * a = -r_vec
219    a = np.linalg.solve(R, -r_vec)   # AR coefficients a[1]…a[p]
220
221    # Noise variance: σ²_e = r[0] + a · r[1:p+1]
222    sigma2 = r[0] + np.dot(a, r_vec)
223
224    # Evaluate AR PSD on dense frequency grid
225    nfft = 4096
226    # Transfer function denominator: A(z) = 1 + a[0]*z^{-1} + … + a[p-1]*z^{-p}
227    A_coeffs = np.concatenate([[1.0], a])
228    _, H = signal.freqz(1.0, A_coeffs, worN=nfft // 2 + 1, fs=fs)
229
230    # PSD = sigma^2 / |H(ω)|^2  — already one-sided from freqz on [0, Nyq]
231    freqs = np.linspace(0, fs / 2, nfft // 2 + 1)
232    Pxx = sigma2 / (fs / 2) * np.abs(H) ** (-2)   # normalize to V²/Hz
233    # Note: we use |A|^{-2} = |H|^2 since H = 1/A for an all-pole filter
234
235    return freqs, Pxx
236
237
238# ============================================================================
239# ANALYSIS HELPERS
240# ============================================================================
241
242def find_peaks_above(freqs, Pxx_db, threshold_db=-20.0):
243    """Return (freq, power_dB) pairs for peaks above threshold."""
244    peak_idx, _ = signal.find_peaks(Pxx_db, height=threshold_db, distance=5)
245    return [(freqs[i], Pxx_db[i]) for i in peak_idx]
246
247
248def print_method_summary(results, true_freqs):
249    """Print detected peak frequencies for each method."""
250    print("\n" + "=" * 65)
251    print("PEAK DETECTION SUMMARY")
252    print(f"True sinusoid frequencies: {true_freqs} Hz")
253    print("=" * 65)
254    print(f"{'Method':<20} {'Detected peaks (Hz)':>40}")
255    print("-" * 65)
256
257    for name, (freqs, Pxx) in results.items():
258        Pxx_db = 10 * np.log10(np.maximum(Pxx, 1e-20))
259        threshold = np.max(Pxx_db) - 25   # peaks within 25 dB of max
260        peaks = find_peaks_above(freqs, Pxx_db, threshold_db=threshold)
261        peak_str = ', '.join(f'{f:.1f}' for f, _ in sorted(peaks))
262        print(f"  {name:<18} {peak_str:>38}")
263
264
265# ============================================================================
266# ENTRY POINT
267# ============================================================================
268
269if __name__ == "__main__":
270    print("SPECTRAL ESTIMATION METHODS")
271
272    # Signal parameters
273    FS = 1000.0        # Hz
274    DURATION = 2.0     # seconds
275    TRUE_FREQS = [100, 120, 300]   # Hz (known ground truth)
276    AR_ORDER = 12      # AR model order — choose ≥ 2× number of sinusoids
277
278    t, x = make_test_signal(fs=FS, duration=DURATION)
279    N = len(x)
280
281    print(f"\nSignal: {N} samples at {FS} Hz  ({DURATION} s)")
282    print(f"True frequencies: {TRUE_FREQS} Hz  (100 and 120 Hz are closely spaced)")
283    print(f"White noise std: 0.3")
284
285    # -----------------------------------------------------------------------
286    # 1. Periodogram
287    # -----------------------------------------------------------------------
288    print("\n" + "=" * 65)
289    print("1. PERIODOGRAM  (rectangular window, no averaging)")
290    print("=" * 65)
291    f_pgram, P_pgram = periodogram(x, FS)
292    print(f"   Resolution: Δf = {FS / N:.3f} Hz")
293    print(f"   Variance  : high (not reduced by N)")
294
295    # -----------------------------------------------------------------------
296    # 2. Bartlett's method
297    # -----------------------------------------------------------------------
298    print("\n" + "=" * 65)
299    print("2. BARTLETT'S METHOD  (non-overlapping segments)")
300    print("=" * 65)
301    K_bartlett = 8   # 8 segments → variance reduced by 8
302    f_bart, P_bart = bartlett(x, FS, K_bartlett)
303    L_bart = N // K_bartlett
304    print(f"   Segments K = {K_bartlett}  →  segment length L = {L_bart}")
305    print(f"   Resolution  : Δf = {FS / L_bart:.3f} Hz  (×{K_bartlett} worse than periodogram)")
306    print(f"   Variance    : reduced by factor {K_bartlett}")
307
308    # -----------------------------------------------------------------------
309    # 3. Welch's method — multiple segment lengths
310    # -----------------------------------------------------------------------
311    print("\n" + "=" * 65)
312    print("3. WELCH'S METHOD  (overlapping Hann windows)")
313    print("=" * 65)
314    welch_configs = [
315        ('Welch (large seg)', N // 2),    # fewer averages, better resolution
316        ('Welch (small seg)', N // 8),    # more averages, lower variance
317    ]
318    welch_results = {}
319    for label, nperseg in welch_configs:
320        f_w, P_w = welch_method(x, FS, nperseg=nperseg)
321        n_seg_eff = 2 * N // nperseg - 1   # approx effective segments (50 % overlap)
322        welch_results[label] = (f_w, P_w)
323        print(f"   {label}: nperseg={nperseg}, Δf={FS/nperseg:.2f} Hz, "
324              f"≈{n_seg_eff} effective averages")
325
326    # -----------------------------------------------------------------------
327    # 4. AR Yule-Walker
328    # -----------------------------------------------------------------------
329    print("\n" + "=" * 65)
330    print(f"4. AR MODEL (Yule-Walker, order p={AR_ORDER})")
331    print("=" * 65)
332    f_ar, P_ar = ar_yule_walker(x, order=AR_ORDER, fs=FS)
333    print(f"   Model order p = {AR_ORDER}  (rule of thumb: p ≥ N/3 for short records)")
334    print(f"   Resolution    : super-resolution — can distinguish sub-DFT peaks")
335    print(f"   Caution       : wrong order → spurious peaks or missed peaks")
336
337    # -----------------------------------------------------------------------
338    # Print peak summary
339    # -----------------------------------------------------------------------
340    all_results = {
341        'Periodogram': (f_pgram, P_pgram),
342        'Bartlett': (f_bart, P_bart),
343        welch_configs[0][0]: welch_results[welch_configs[0][0]],
344        welch_configs[1][0]: welch_results[welch_configs[1][0]],
345        'AR Yule-Walker': (f_ar, P_ar),
346    }
347    print_method_summary(all_results, TRUE_FREQS)
348
349    # -----------------------------------------------------------------------
350    # VISUALIZATION — 3×2 grid
351    # -----------------------------------------------------------------------
352    fig, axes = plt.subplots(3, 2, figsize=(13, 12))
353    fig.suptitle('Spectral Estimation Methods Comparison', fontsize=13, fontweight='bold')
354
355    # Helper: dB conversion with floor
356    def to_db(P):
357        return 10 * np.log10(np.maximum(P, 1e-20))
358
359    # (0,0) Periodogram
360    ax = axes[0, 0]
361    ax.plot(f_pgram, to_db(P_pgram), 'b-', lw=0.8, label='Periodogram')
362    for f_true in TRUE_FREQS:
363        ax.axvline(f_true, color='r', ls='--', lw=1, alpha=0.7)
364    ax.set_xlabel('Frequency (Hz)')
365    ax.set_ylabel('PSD (dB re V²/Hz)')
366    ax.set_title(f'Periodogram (N={N}, Δf={FS/N:.2f} Hz)')
367    ax.legend(fontsize=9)
368    ax.grid(True, alpha=0.3)
369    ax.set_xlim(0, FS / 2)
370
371    # (0,1) Periodogram — zoom in on 80–140 Hz to show 100 vs 120 Hz
372    ax = axes[0, 1]
373    ax.plot(f_pgram, to_db(P_pgram), 'b-', lw=1.2, label='Periodogram')
374    for f_true in [100, 120]:
375        ax.axvline(f_true, color='r', ls='--', lw=1.2, alpha=0.8, label=f'{f_true} Hz')
376    ax.set_xlabel('Frequency (Hz)')
377    ax.set_ylabel('PSD (dB re V²/Hz)')
378    ax.set_title('Zoom: 80–140 Hz (can periodogram resolve 100 & 120 Hz?)')
379    ax.legend(fontsize=8)
380    ax.grid(True, alpha=0.3)
381    ax.set_xlim(80, 140)
382
383    # (1,0) Bartlett vs Periodogram
384    ax = axes[1, 0]
385    ax.plot(f_pgram, to_db(P_pgram), 'gray', lw=0.8, alpha=0.6, label='Periodogram')
386    ax.plot(f_bart, to_db(P_bart), 'b-', lw=2,
387            label=f'Bartlett (K={K_bartlett}, Δf={FS/(N//K_bartlett):.1f} Hz)')
388    for f_true in TRUE_FREQS:
389        ax.axvline(f_true, color='r', ls='--', lw=1, alpha=0.7)
390    ax.set_xlabel('Frequency (Hz)')
391    ax.set_ylabel('PSD (dB re V²/Hz)')
392    ax.set_title(f"Bartlett: variance ↓ {K_bartlett}×, resolution ↓ {K_bartlett}×")
393    ax.legend(fontsize=9)
394    ax.grid(True, alpha=0.3)
395    ax.set_xlim(0, FS / 2)
396
397    # (1,1) Welch — two segment lengths
398    ax = axes[1, 1]
399    ax.plot(f_pgram, to_db(P_pgram), 'gray', lw=0.8, alpha=0.5, label='Periodogram')
400    wcolors = ['tab:blue', 'tab:orange']
401    for (label, _), color in zip(welch_configs, wcolors):
402        fw, Pw = welch_results[label]
403        ax.plot(fw, to_db(Pw), color=color, lw=1.8, label=label)
404    for f_true in TRUE_FREQS:
405        ax.axvline(f_true, color='r', ls='--', lw=1, alpha=0.7)
406    ax.set_xlabel('Frequency (Hz)')
407    ax.set_ylabel('PSD (dB re V²/Hz)')
408    ax.set_title('Welch: resolution vs. variance tradeoff')
409    ax.legend(fontsize=8)
410    ax.grid(True, alpha=0.3)
411    ax.set_xlim(0, FS / 2)
412
413    # (2,0) AR Yule-Walker vs Welch (full range)
414    ax = axes[2, 0]
415    f_w_large, P_w_large = welch_results[welch_configs[0][0]]
416    ax.plot(f_w_large, to_db(P_w_large), 'b-', lw=1.5, label='Welch (large seg)')
417    ax.plot(f_ar, to_db(P_ar), 'r--', lw=2,
418            label=f'AR Yule-Walker (p={AR_ORDER})')
419    for f_true in TRUE_FREQS:
420        ax.axvline(f_true, color='k', ls=':', lw=1.2, alpha=0.8)
421    ax.set_xlabel('Frequency (Hz)')
422    ax.set_ylabel('PSD (dB re V²/Hz)')
423    ax.set_title(f'AR(p={AR_ORDER}) vs Welch: parametric super-resolution')
424    ax.legend(fontsize=9)
425    ax.grid(True, alpha=0.3)
426    ax.set_xlim(0, FS / 2)
427
428    # (2,1) AR zoom — 80–140 Hz (resolution comparison)
429    ax = axes[2, 1]
430    ax.plot(f_pgram, to_db(P_pgram), 'gray', lw=1, alpha=0.6, label='Periodogram')
431    ax.plot(f_w_large, to_db(P_w_large), 'b-', lw=1.8, label='Welch (large seg)')
432    ax.plot(f_ar, to_db(P_ar), 'r--', lw=2, label=f'AR (p={AR_ORDER})')
433    for f_true in [100, 120]:
434        ax.axvline(f_true, color='k', ls=':', lw=1.5, alpha=0.8, label=f'{f_true} Hz')
435    ax.set_xlabel('Frequency (Hz)')
436    ax.set_ylabel('PSD (dB re V²/Hz)')
437    ax.set_title('Zoom 80–140 Hz: AR resolves 100 & 120 Hz; Welch may not')
438    ax.legend(fontsize=8)
439    ax.grid(True, alpha=0.3)
440    ax.set_xlim(80, 140)
441
442    plt.tight_layout()
443    out_path = '/opt/projects/01_Personal/03_Study/examples/Signal_Processing/12_spectral_estimation.png'
444    plt.savefig(out_path, dpi=150)
445    print(f"\nSaved visualization: {out_path}")
446    plt.close()
447
448    print("\n" + "=" * 65)
449    print("Key Takeaways:")
450    print("  - Periodogram: maximum resolution (Δf=fs/N), but high variance")
451    print("  - Bartlett: averaging K segments reduces variance by K, resolution K× worse")
452    print("  - Welch: 50% overlap + windowing — best practical non-parametric method")
453    print("  - AR / Yule-Walker: parametric, super-resolution for short records")
454    print("  - Resolution-variance tradeoff is fundamental (Heisenberg-like)")
455    print("  - AR order too small → merged peaks; too large → spurious peaks")
456    print("=" * 65)