1#!/usr/bin/env python3
2"""
3Spectral Estimation Methods
4=============================
5
6This script compares non-parametric and parametric spectral estimation
7techniques applied to a signal with known spectral content:
8
9Non-parametric methods:
10 1. Periodogram — squared magnitude of FFT; high variance, good resolution
11 2. Bartlett's method — average of non-overlapping periodograms; reduces variance
12 3. Welch's method — overlapping windowed segments; variance vs. resolution tradeoff
13
14Parametric methods:
15 4. AR model (Yule-Walker) — models signal as output of all-pole filter driven by
16 white noise; high resolution for short data records
17
18Key Concepts:
19 - Variance and resolution are fundamentally traded off in spectral estimation
20 - Averaging reduces variance by factor K (segments), but reduces resolution
21 by same factor (segment length N/K vs full N)
22 - Windowing reduces spectral leakage at cost of main-lobe width
23 - AR model can resolve closely-spaced peaks that DFT-based methods blur
24 - Yule-Walker equations: R * a = -r (autocorrelation matrix system)
25
26Author: Educational example for Signal Processing
27License: MIT
28"""
29
30import numpy as np
31from scipy import signal
32from scipy.linalg import toeplitz
33import matplotlib.pyplot as plt
34
35
36# ============================================================================
37# SIGNAL GENERATION
38# ============================================================================
39
40def make_test_signal(fs=1000.0, duration=2.0, seed=42):
41 """
42 Construct a test signal with known spectral content:
43 - Two closely-spaced sinusoids at 100 and 120 Hz
44 - One isolated sinusoid at 300 Hz
45 - White Gaussian noise (SNR ≈ 10 dB)
46
47 This combination tests each estimator's ability to:
48 (a) resolve two nearby peaks (resolution test)
49 (b) correctly locate an isolated peak (frequency accuracy test)
50
51 Args:
52 fs (float): Sampling rate in Hz
53 duration (float): Signal length in seconds
54 seed (int) : RNG seed for reproducibility
55
56 Returns:
57 (t, x): time vector and signal ndarray
58 """
59 rng = np.random.default_rng(seed)
60 t = np.arange(0, duration, 1.0 / fs)
61 N = len(t)
62
63 # Sinusoidal components
64 x = (1.0 * np.sin(2 * np.pi * 100 * t) +
65 0.8 * np.sin(2 * np.pi * 120 * t) +
66 0.6 * np.sin(2 * np.pi * 300 * t))
67
68 # White noise — std chosen so broadband SNR ≈ 10 dB
69 noise_std = 0.3
70 x += noise_std * rng.standard_normal(N)
71
72 return t, x
73
74
75# ============================================================================
76# NON-PARAMETRIC ESTIMATORS
77# ============================================================================
78
79def periodogram(x, fs):
80 """
81 Classic periodogram: P(k) = |X(k)|² / N.
82
83 The periodogram has the same resolution as the DFT (Δf = fs/N),
84 but its variance does NOT decrease as N increases — it remains
85 proportional to the true spectrum squared at each frequency.
86
87 Args:
88 x (ndarray): Input signal (length N)
89 fs (float) : Sampling rate (Hz)
90
91 Returns:
92 (freqs, Pxx): frequency axis and one-sided PSD (V²/Hz)
93 """
94 N = len(x)
95 # No windowing (rectangular window) → highest frequency resolution
96 X = np.fft.rfft(x) / N
97 Pxx = 2 * np.abs(X) ** 2 / (fs / N) # one-sided PSD: multiply by 2 (except DC/Nyq)
98 Pxx[0] /= 2 # DC bin is not doubled
99 if N % 2 == 0:
100 Pxx[-1] /= 2 # Nyquist bin (only if N even)
101 freqs = np.fft.rfftfreq(N, d=1.0 / fs)
102 return freqs, Pxx
103
104
105def bartlett(x, fs, K):
106 """
107 Bartlett's method: average K non-overlapping periodograms.
108
109 Divides x into K non-overlapping segments of length L = N//K.
110 Each segment's periodogram is computed (rectangular window),
111 and the K periodograms are averaged.
112
113 Effect:
114 - Variance reduced by factor K (σ² → σ²/K)
115 - Frequency resolution reduced: Δf = fs/L = fs*K/N (worse by K)
116
117 Args:
118 x (ndarray): Input signal
119 fs (float) : Sampling rate
120 K (int) : Number of non-overlapping segments
121
122 Returns:
123 (freqs, Pxx_avg): frequency axis and averaged PSD
124 """
125 N = len(x)
126 L = N // K # segment length
127
128 freqs = np.fft.rfftfreq(L, d=1.0 / fs)
129 Pxx_sum = np.zeros(len(freqs))
130
131 for k in range(K):
132 seg = x[k * L:(k + 1) * L]
133 _, P_seg = periodogram(seg, fs)
134 # P_seg may differ in length if rfftfreq gives different bins;
135 # take the minimum length to be safe
136 n = min(len(Pxx_sum), len(P_seg))
137 Pxx_sum[:n] += P_seg[:n]
138
139 Pxx_avg = Pxx_sum / K
140 return freqs, Pxx_avg
141
142
143def welch_method(x, fs, nperseg, noverlap=None, window='hann'):
144 """
145 Welch's method: overlapping windowed periodograms averaged.
146
147 Improvements over Bartlett:
148 - Overlapping segments (typically 50 %) increase the number of
149 averages without reducing the segment length as much.
150 - Windowing (e.g. Hann) reduces spectral leakage.
151
152 The resolution-variance tradeoff is parameterized by nperseg:
153 - Larger nperseg → better resolution, higher variance
154 - Smaller nperseg → worse resolution, lower variance
155
156 Uses scipy.signal.welch for a robust, well-tested implementation.
157
158 Args:
159 x (ndarray): Input signal
160 fs (float) : Sampling rate (Hz)
161 nperseg (int) : Samples per segment
162 noverlap (int) : Overlap samples (default: nperseg//2)
163 window (str) : Window function name
164
165 Returns:
166 (freqs, Pxx): scipy.signal.welch output
167 """
168 if noverlap is None:
169 noverlap = nperseg // 2
170 freqs, Pxx = signal.welch(x, fs=fs, window=window,
171 nperseg=nperseg, noverlap=noverlap,
172 scaling='density')
173 return freqs, Pxx
174
175
176# ============================================================================
177# PARAMETRIC ESTIMATOR: AR / YULE-WALKER
178# ============================================================================
179
180def ar_yule_walker(x, order, fs):
181 """
182 Parametric spectral estimate using an AR(p) model.
183
184 The AR model assumes the signal is the output of an all-pole IIR
185 filter driven by white noise:
186
187 x[n] = -a[1]*x[n-1] - ... - a[p]*x[n-p] + e[n]
188
189 The Yule-Walker equations express this in matrix form:
190 R_xx * a = -r_xx
191 where R_xx is the p×p autocorrelation (Toeplitz) matrix and r_xx
192 is the vector of lags 1…p.
193
194 Once 'a' is found, the PSD estimate is:
195 P(ω) = σ²_e / |A(e^jω)|²
196 where A(e^jω) = 1 + a[1]e^{-jω} + … + a[p]e^{-jpω}.
197
198 This method can resolve closely-spaced peaks far better than DFT-based
199 methods when the model order matches the signal structure.
200
201 Args:
202 x (ndarray): Input signal
203 order (int) : AR model order p (number of poles)
204 fs (float) : Sampling rate (Hz)
205
206 Returns:
207 (freqs, Pxx): frequency axis and PSD estimate
208 """
209 N = len(x)
210
211 # Estimate autocorrelation lags 0 … order (biased estimator)
212 r = np.array([np.dot(x[:N - k], x[k:]) / N for k in range(order + 1)])
213
214 # Build Toeplitz matrix from lags 0 … order-1
215 R = toeplitz(r[:order]) # p × p symmetric Toeplitz
216 r_vec = r[1:order + 1] # lags 1 … p
217
218 # Solve Yule-Walker: R * a = -r_vec
219 a = np.linalg.solve(R, -r_vec) # AR coefficients a[1]…a[p]
220
221 # Noise variance: σ²_e = r[0] + a · r[1:p+1]
222 sigma2 = r[0] + np.dot(a, r_vec)
223
224 # Evaluate AR PSD on dense frequency grid
225 nfft = 4096
226 # Transfer function denominator: A(z) = 1 + a[0]*z^{-1} + … + a[p-1]*z^{-p}
227 A_coeffs = np.concatenate([[1.0], a])
228 _, H = signal.freqz(1.0, A_coeffs, worN=nfft // 2 + 1, fs=fs)
229
230 # PSD = sigma^2 / |H(ω)|^2 — already one-sided from freqz on [0, Nyq]
231 freqs = np.linspace(0, fs / 2, nfft // 2 + 1)
232 Pxx = sigma2 / (fs / 2) * np.abs(H) ** (-2) # normalize to V²/Hz
233 # Note: we use |A|^{-2} = |H|^2 since H = 1/A for an all-pole filter
234
235 return freqs, Pxx
236
237
238# ============================================================================
239# ANALYSIS HELPERS
240# ============================================================================
241
242def find_peaks_above(freqs, Pxx_db, threshold_db=-20.0):
243 """Return (freq, power_dB) pairs for peaks above threshold."""
244 peak_idx, _ = signal.find_peaks(Pxx_db, height=threshold_db, distance=5)
245 return [(freqs[i], Pxx_db[i]) for i in peak_idx]
246
247
248def print_method_summary(results, true_freqs):
249 """Print detected peak frequencies for each method."""
250 print("\n" + "=" * 65)
251 print("PEAK DETECTION SUMMARY")
252 print(f"True sinusoid frequencies: {true_freqs} Hz")
253 print("=" * 65)
254 print(f"{'Method':<20} {'Detected peaks (Hz)':>40}")
255 print("-" * 65)
256
257 for name, (freqs, Pxx) in results.items():
258 Pxx_db = 10 * np.log10(np.maximum(Pxx, 1e-20))
259 threshold = np.max(Pxx_db) - 25 # peaks within 25 dB of max
260 peaks = find_peaks_above(freqs, Pxx_db, threshold_db=threshold)
261 peak_str = ', '.join(f'{f:.1f}' for f, _ in sorted(peaks))
262 print(f" {name:<18} {peak_str:>38}")
263
264
265# ============================================================================
266# ENTRY POINT
267# ============================================================================
268
269if __name__ == "__main__":
270 print("SPECTRAL ESTIMATION METHODS")
271
272 # Signal parameters
273 FS = 1000.0 # Hz
274 DURATION = 2.0 # seconds
275 TRUE_FREQS = [100, 120, 300] # Hz (known ground truth)
276 AR_ORDER = 12 # AR model order — choose ≥ 2× number of sinusoids
277
278 t, x = make_test_signal(fs=FS, duration=DURATION)
279 N = len(x)
280
281 print(f"\nSignal: {N} samples at {FS} Hz ({DURATION} s)")
282 print(f"True frequencies: {TRUE_FREQS} Hz (100 and 120 Hz are closely spaced)")
283 print(f"White noise std: 0.3")
284
285 # -----------------------------------------------------------------------
286 # 1. Periodogram
287 # -----------------------------------------------------------------------
288 print("\n" + "=" * 65)
289 print("1. PERIODOGRAM (rectangular window, no averaging)")
290 print("=" * 65)
291 f_pgram, P_pgram = periodogram(x, FS)
292 print(f" Resolution: Δf = {FS / N:.3f} Hz")
293 print(f" Variance : high (not reduced by N)")
294
295 # -----------------------------------------------------------------------
296 # 2. Bartlett's method
297 # -----------------------------------------------------------------------
298 print("\n" + "=" * 65)
299 print("2. BARTLETT'S METHOD (non-overlapping segments)")
300 print("=" * 65)
301 K_bartlett = 8 # 8 segments → variance reduced by 8
302 f_bart, P_bart = bartlett(x, FS, K_bartlett)
303 L_bart = N // K_bartlett
304 print(f" Segments K = {K_bartlett} → segment length L = {L_bart}")
305 print(f" Resolution : Δf = {FS / L_bart:.3f} Hz (×{K_bartlett} worse than periodogram)")
306 print(f" Variance : reduced by factor {K_bartlett}")
307
308 # -----------------------------------------------------------------------
309 # 3. Welch's method — multiple segment lengths
310 # -----------------------------------------------------------------------
311 print("\n" + "=" * 65)
312 print("3. WELCH'S METHOD (overlapping Hann windows)")
313 print("=" * 65)
314 welch_configs = [
315 ('Welch (large seg)', N // 2), # fewer averages, better resolution
316 ('Welch (small seg)', N // 8), # more averages, lower variance
317 ]
318 welch_results = {}
319 for label, nperseg in welch_configs:
320 f_w, P_w = welch_method(x, FS, nperseg=nperseg)
321 n_seg_eff = 2 * N // nperseg - 1 # approx effective segments (50 % overlap)
322 welch_results[label] = (f_w, P_w)
323 print(f" {label}: nperseg={nperseg}, Δf={FS/nperseg:.2f} Hz, "
324 f"≈{n_seg_eff} effective averages")
325
326 # -----------------------------------------------------------------------
327 # 4. AR Yule-Walker
328 # -----------------------------------------------------------------------
329 print("\n" + "=" * 65)
330 print(f"4. AR MODEL (Yule-Walker, order p={AR_ORDER})")
331 print("=" * 65)
332 f_ar, P_ar = ar_yule_walker(x, order=AR_ORDER, fs=FS)
333 print(f" Model order p = {AR_ORDER} (rule of thumb: p ≥ N/3 for short records)")
334 print(f" Resolution : super-resolution — can distinguish sub-DFT peaks")
335 print(f" Caution : wrong order → spurious peaks or missed peaks")
336
337 # -----------------------------------------------------------------------
338 # Print peak summary
339 # -----------------------------------------------------------------------
340 all_results = {
341 'Periodogram': (f_pgram, P_pgram),
342 'Bartlett': (f_bart, P_bart),
343 welch_configs[0][0]: welch_results[welch_configs[0][0]],
344 welch_configs[1][0]: welch_results[welch_configs[1][0]],
345 'AR Yule-Walker': (f_ar, P_ar),
346 }
347 print_method_summary(all_results, TRUE_FREQS)
348
349 # -----------------------------------------------------------------------
350 # VISUALIZATION — 3×2 grid
351 # -----------------------------------------------------------------------
352 fig, axes = plt.subplots(3, 2, figsize=(13, 12))
353 fig.suptitle('Spectral Estimation Methods Comparison', fontsize=13, fontweight='bold')
354
355 # Helper: dB conversion with floor
356 def to_db(P):
357 return 10 * np.log10(np.maximum(P, 1e-20))
358
359 # (0,0) Periodogram
360 ax = axes[0, 0]
361 ax.plot(f_pgram, to_db(P_pgram), 'b-', lw=0.8, label='Periodogram')
362 for f_true in TRUE_FREQS:
363 ax.axvline(f_true, color='r', ls='--', lw=1, alpha=0.7)
364 ax.set_xlabel('Frequency (Hz)')
365 ax.set_ylabel('PSD (dB re V²/Hz)')
366 ax.set_title(f'Periodogram (N={N}, Δf={FS/N:.2f} Hz)')
367 ax.legend(fontsize=9)
368 ax.grid(True, alpha=0.3)
369 ax.set_xlim(0, FS / 2)
370
371 # (0,1) Periodogram — zoom in on 80–140 Hz to show 100 vs 120 Hz
372 ax = axes[0, 1]
373 ax.plot(f_pgram, to_db(P_pgram), 'b-', lw=1.2, label='Periodogram')
374 for f_true in [100, 120]:
375 ax.axvline(f_true, color='r', ls='--', lw=1.2, alpha=0.8, label=f'{f_true} Hz')
376 ax.set_xlabel('Frequency (Hz)')
377 ax.set_ylabel('PSD (dB re V²/Hz)')
378 ax.set_title('Zoom: 80–140 Hz (can periodogram resolve 100 & 120 Hz?)')
379 ax.legend(fontsize=8)
380 ax.grid(True, alpha=0.3)
381 ax.set_xlim(80, 140)
382
383 # (1,0) Bartlett vs Periodogram
384 ax = axes[1, 0]
385 ax.plot(f_pgram, to_db(P_pgram), 'gray', lw=0.8, alpha=0.6, label='Periodogram')
386 ax.plot(f_bart, to_db(P_bart), 'b-', lw=2,
387 label=f'Bartlett (K={K_bartlett}, Δf={FS/(N//K_bartlett):.1f} Hz)')
388 for f_true in TRUE_FREQS:
389 ax.axvline(f_true, color='r', ls='--', lw=1, alpha=0.7)
390 ax.set_xlabel('Frequency (Hz)')
391 ax.set_ylabel('PSD (dB re V²/Hz)')
392 ax.set_title(f"Bartlett: variance ↓ {K_bartlett}×, resolution ↓ {K_bartlett}×")
393 ax.legend(fontsize=9)
394 ax.grid(True, alpha=0.3)
395 ax.set_xlim(0, FS / 2)
396
397 # (1,1) Welch — two segment lengths
398 ax = axes[1, 1]
399 ax.plot(f_pgram, to_db(P_pgram), 'gray', lw=0.8, alpha=0.5, label='Periodogram')
400 wcolors = ['tab:blue', 'tab:orange']
401 for (label, _), color in zip(welch_configs, wcolors):
402 fw, Pw = welch_results[label]
403 ax.plot(fw, to_db(Pw), color=color, lw=1.8, label=label)
404 for f_true in TRUE_FREQS:
405 ax.axvline(f_true, color='r', ls='--', lw=1, alpha=0.7)
406 ax.set_xlabel('Frequency (Hz)')
407 ax.set_ylabel('PSD (dB re V²/Hz)')
408 ax.set_title('Welch: resolution vs. variance tradeoff')
409 ax.legend(fontsize=8)
410 ax.grid(True, alpha=0.3)
411 ax.set_xlim(0, FS / 2)
412
413 # (2,0) AR Yule-Walker vs Welch (full range)
414 ax = axes[2, 0]
415 f_w_large, P_w_large = welch_results[welch_configs[0][0]]
416 ax.plot(f_w_large, to_db(P_w_large), 'b-', lw=1.5, label='Welch (large seg)')
417 ax.plot(f_ar, to_db(P_ar), 'r--', lw=2,
418 label=f'AR Yule-Walker (p={AR_ORDER})')
419 for f_true in TRUE_FREQS:
420 ax.axvline(f_true, color='k', ls=':', lw=1.2, alpha=0.8)
421 ax.set_xlabel('Frequency (Hz)')
422 ax.set_ylabel('PSD (dB re V²/Hz)')
423 ax.set_title(f'AR(p={AR_ORDER}) vs Welch: parametric super-resolution')
424 ax.legend(fontsize=9)
425 ax.grid(True, alpha=0.3)
426 ax.set_xlim(0, FS / 2)
427
428 # (2,1) AR zoom — 80–140 Hz (resolution comparison)
429 ax = axes[2, 1]
430 ax.plot(f_pgram, to_db(P_pgram), 'gray', lw=1, alpha=0.6, label='Periodogram')
431 ax.plot(f_w_large, to_db(P_w_large), 'b-', lw=1.8, label='Welch (large seg)')
432 ax.plot(f_ar, to_db(P_ar), 'r--', lw=2, label=f'AR (p={AR_ORDER})')
433 for f_true in [100, 120]:
434 ax.axvline(f_true, color='k', ls=':', lw=1.5, alpha=0.8, label=f'{f_true} Hz')
435 ax.set_xlabel('Frequency (Hz)')
436 ax.set_ylabel('PSD (dB re V²/Hz)')
437 ax.set_title('Zoom 80–140 Hz: AR resolves 100 & 120 Hz; Welch may not')
438 ax.legend(fontsize=8)
439 ax.grid(True, alpha=0.3)
440 ax.set_xlim(80, 140)
441
442 plt.tight_layout()
443 out_path = '/opt/projects/01_Personal/03_Study/examples/Signal_Processing/12_spectral_estimation.png'
444 plt.savefig(out_path, dpi=150)
445 print(f"\nSaved visualization: {out_path}")
446 plt.close()
447
448 print("\n" + "=" * 65)
449 print("Key Takeaways:")
450 print(" - Periodogram: maximum resolution (Δf=fs/N), but high variance")
451 print(" - Bartlett: averaging K segments reduces variance by K, resolution K× worse")
452 print(" - Welch: 50% overlap + windowing — best practical non-parametric method")
453 print(" - AR / Yule-Walker: parametric, super-resolution for short records")
454 print(" - Resolution-variance tradeoff is fundamental (Heisenberg-like)")
455 print(" - AR order too small → merged peaks; too large → spurious peaks")
456 print("=" * 65)