11_multirate.py

Download
python 387 lines 15.4 KB
  1#!/usr/bin/env python3
  2"""
  3Multirate Signal Processing
  4============================
  5
  6This script demonstrates the core concepts of multirate DSP:
  7
  81. Downsampling (decimation) — with and without anti-aliasing filter
  92. Upsampling (interpolation) — with and without interpolation filter
 103. Aliasing visualization — what happens when Nyquist theorem is violated
 114. Rational rate conversion — resampling from 44100 Hz to 48000 Hz (L/M = 160/147)
 125. Polyphase view — why decimation and interpolation filters work
 13
 14Key Concepts:
 15    - Decimation by M : keep every M-th sample → Nyquist halves → aliasing possible
 16    - Interpolation by L : insert L-1 zeros, then lowpass → smooth the zeros
 17    - Rational resampling: upsample by L, lowpass, then downsample by M
 18    - Anti-aliasing filter cutoff must be ≤ π/M (normalized) before decimation
 19    - Interpolation filter cutoff must be ≤ π/L and gain = L after upsampling
 20
 21Practical note:
 22    scipy.signal.decimate  = lowpass filter + downsample (in one call)
 23    scipy.signal.resample  = FFT-based resampling (handles arbitrary ratios)
 24    scipy.signal.resample_poly = polyphase rational resampling (exact L/M)
 25
 26Author: Educational example for Signal Processing
 27License: MIT
 28"""
 29
 30import numpy as np
 31from scipy import signal
 32import matplotlib.pyplot as plt
 33
 34
 35# ============================================================================
 36# HELPERS
 37# ============================================================================
 38
 39def spectrum_db(x, fs, nfft=2048):
 40    """
 41    Compute one-sided power spectrum in dBFS.
 42
 43    Args:
 44        x   (ndarray): Input signal
 45        fs  (float)  : Sampling rate (Hz)
 46        nfft (int)   : FFT size
 47
 48    Returns:
 49        (freqs, mag_db): frequency axis (Hz) and magnitude (dBFS)
 50    """
 51    X = np.fft.rfft(x, n=nfft)
 52    mag = np.abs(X) / nfft
 53    mag_db = 20 * np.log10(np.maximum(mag, 1e-15))
 54    freqs = np.fft.rfftfreq(nfft, d=1.0 / fs)
 55    return freqs, mag_db
 56
 57
 58def make_test_signal(fs, duration=0.5, freqs_hz=None, amps=None, seed=0):
 59    """
 60    Construct a multi-tone signal.
 61
 62    Args:
 63        fs       (float): Sampling rate
 64        duration (float): Signal duration in seconds
 65        freqs_hz (list) : Sinusoid frequencies in Hz
 66        amps     (list) : Corresponding amplitudes
 67
 68    Returns:
 69        (t, x): time vector and signal
 70    """
 71    if freqs_hz is None:
 72        freqs_hz = [1000, 5000, 9000]
 73    if amps is None:
 74        amps = [1.0, 0.8, 0.6]
 75
 76    t = np.arange(0, duration, 1.0 / fs)
 77    x = np.zeros(len(t))
 78    for f, a in zip(freqs_hz, amps):
 79        x += a * np.sin(2 * np.pi * f * t)
 80    return t, x
 81
 82
 83# ============================================================================
 84# DEMO 1: DECIMATION (WITH AND WITHOUT ANTI-ALIASING)
 85# ============================================================================
 86
 87def demo_decimation():
 88    """
 89    Decimate a signal by factor M = 4, comparing:
 90      (a) naive downsampling — no anti-aliasing filter → aliasing
 91      (b) scipy.signal.decimate — built-in anti-aliasing Chebyshev lowpass
 92    """
 93    print("=" * 65)
 94    print("1. DECIMATION BY M=4  (44100 → 11025 Hz)")
 95    print("=" * 65)
 96
 97    fs = 44100
 98    M = 4                          # decimation factor
 99    fs_out = fs // M               # 11025 Hz
100    nyquist_out = fs_out / 2       # new Nyquist = 5512.5 Hz
101
102    # Test signal: three tones at 1 kHz (safe), 4 kHz (safe), 8 kHz (alias!)
103    # 8 kHz > nyquist_out → will alias to 8000 − 11025 = −3025 → 3025 Hz
104    freqs = [1000, 4000, 8000]
105    t, x = make_test_signal(fs, duration=0.2, freqs_hz=freqs, amps=[1.0, 0.8, 0.6])
106
107    print(f"  Input: fs={fs} Hz,  {freqs} Hz tones")
108    print(f"  Decimation factor M={M}  →  output fs={fs_out} Hz")
109    print(f"  New Nyquist = {nyquist_out} Hz  → tones above {nyquist_out} Hz WILL alias")
110
111    # (a) Naive downsampling — select every M-th sample, NO filtering
112    x_naive = x[::M]
113    t_naive = t[::M]
114
115    alias_freq = abs(freqs[-1] - fs_out)   # predicted aliased frequency
116    print(f"\n  (a) Naive downsampling: {freqs[-1]} Hz aliases to {alias_freq} Hz")
117
118    # (b) scipy.signal.decimate — applies Chebyshev I lowpass, then downsample
119    #     Default: order-8 Chebyshev I with cutoff at fs_out/2
120    x_decimated = signal.decimate(x, M, ftype='fir', zero_phase=True)
121    t_decimated = np.arange(len(x_decimated)) / fs_out
122
123    print(f"  (b) decimate(): anti-aliasing FIR lowpass → clean output")
124
125    # Power at the alias frequency after each method
126    for label, sig, sr in [('Naive', x_naive, fs_out),
127                             ('Decimated', x_decimated, fs_out)]:
128        _, mag_db = spectrum_db(sig, sr, nfft=4096)
129        freqs_axis = np.fft.rfftfreq(4096, d=1.0 / sr)
130        idx = np.argmin(np.abs(freqs_axis - alias_freq))
131        print(f"  {label:<12}: power at alias ({alias_freq} Hz) = "
132              f"{mag_db[idx]:.1f} dBFS")
133
134    return fs, fs_out, M, t, x, t_naive, x_naive, t_decimated, x_decimated, freqs
135
136
137# ============================================================================
138# DEMO 2: UPSAMPLING / INTERPOLATION
139# ============================================================================
140
141def demo_interpolation():
142    """
143    Upsample a signal by factor L = 4, comparing:
144      (a) zero insertion only — no interpolation filter → imaging
145      (b) scipy.signal.resample_poly with L, 1 — polyphase interpolation
146    """
147    print("\n" + "=" * 65)
148    print("2. INTERPOLATION BY L=4  (11025 → 44100 Hz)")
149    print("=" * 65)
150
151    fs_in = 11025
152    L = 4
153    fs_out = fs_in * L    # 44100 Hz
154
155    freqs = [1000, 4000]
156    t_in, x_in = make_test_signal(fs_in, duration=0.2, freqs_hz=freqs)
157
158    print(f"  Input: fs={fs_in} Hz,  {freqs} Hz tones")
159    print(f"  Interpolation factor L={L}  →  output fs={fs_out} Hz")
160
161    # (a) Zero insertion only: insert L-1 zeros between every sample
162    x_upsampled = np.zeros(len(x_in) * L)
163    x_upsampled[::L] = x_in   # place original samples at every L-th position
164    t_up = np.arange(len(x_upsampled)) / fs_out
165
166    print(f"\n  (a) Zero insertion: creates images at multiples of {fs_in} Hz")
167
168    # (b) Polyphase interpolation via resample_poly
169    x_interp = signal.resample_poly(x_in, L, 1)   # upsample by L, downsample by 1
170    t_interp = np.arange(len(x_interp)) / fs_out
171
172    print(f"  (b) resample_poly(L=4, M=1): built-in polyphase filter removes images")
173
174    # Check power in image band (around 11025 − 1000 = 10025 Hz)
175    image_freq = fs_in - freqs[0]
176    for label, sig, sr in [('Zero-insert', x_upsampled, fs_out),
177                             ('resample_poly', x_interp, fs_out)]:
178        _, mag_db = spectrum_db(sig, sr, nfft=8192)
179        freqs_axis = np.fft.rfftfreq(8192, d=1.0 / sr)
180        idx = np.argmin(np.abs(freqs_axis - image_freq))
181        print(f"  {label:<16}: image power at {image_freq} Hz = {mag_db[idx]:.1f} dBFS")
182
183    return fs_in, fs_out, L, t_in, x_in, t_up, x_upsampled, t_interp, x_interp
184
185
186# ============================================================================
187# DEMO 3: RATIONAL RATE CONVERSION  44100 → 48000 Hz
188# ============================================================================
189
190def demo_rate_conversion():
191    """
192    Convert from CD rate (44100 Hz) to professional audio (48000 Hz).
193
194    Rational ratio: 48000/44100 = 160/147  (L=160, M=147)
195    Process: upsample by 160, lowpass filter, downsample by 147.
196    scipy.signal.resample_poly handles this efficiently using a polyphase filter.
197    """
198    print("\n" + "=" * 65)
199    print("3. RATIONAL RATE CONVERSION  44100 Hz → 48000 Hz  (L/M = 160/147)")
200    print("=" * 65)
201
202    fs_in = 44100
203    fs_out = 48000
204    L, M = 160, 147    # exact rational ratio
205
206    duration = 0.05   # short segment — polyphase is memory-intensive for large L
207    freqs = [440, 4000, 10000]   # A4, ~mid, ~high
208    t_in, x_in = make_test_signal(fs_in, duration=duration, freqs_hz=freqs)
209    t_out_expected = np.arange(int(duration * fs_out)) / fs_out
210
211    print(f"  Input: {fs_in} Hz, {len(x_in)} samples")
212    print(f"  Ratio: {fs_out}/{fs_in} = {L}/{M}")
213
214    # scipy resample_poly: efficient polyphase implementation
215    x_resampled = signal.resample_poly(x_in, L, M)
216    print(f"  Output (resample_poly): {len(x_resampled)} samples  "
217          f"(expected ≈ {int(round(len(x_in) * L / M))})")
218
219    # FFT-based resample (scipy.signal.resample) — alternative
220    n_out = int(round(len(x_in) * fs_out / fs_in))
221    x_fft_rs = signal.resample(x_in, n_out)
222    print(f"  Output (FFT resample) : {len(x_fft_rs)} samples")
223
224    # Verify spectral content is preserved
225    print(f"\n  Verifying tonal content ({freqs} Hz) is preserved:")
226    _, mag_in_db = spectrum_db(x_in, fs_in, nfft=8192)
227    freqs_in = np.fft.rfftfreq(8192, d=1.0 / fs_in)
228    _, mag_out_db = spectrum_db(x_resampled, fs_out, nfft=8192)
229    freqs_out = np.fft.rfftfreq(8192, d=1.0 / fs_out)
230
231    print(f"  {'Freq':>6}  {'In (dBFS)':>12}  {'Out (dBFS)':>12}  {'Diff':>8}")
232    for f in freqs:
233        idx_in = np.argmin(np.abs(freqs_in - f))
234        idx_out = np.argmin(np.abs(freqs_out - f))
235        diff = mag_out_db[idx_out] - mag_in_db[idx_in]
236        print(f"  {f:>6} Hz  {mag_in_db[idx_in]:>10.1f}  "
237              f"{mag_out_db[idx_out]:>10.1f}  {diff:>+7.2f} dB")
238
239    return (fs_in, fs_out, t_in, x_in, x_resampled, x_fft_rs,
240            freqs_in, mag_in_db, freqs_out, mag_out_db)
241
242
243# ============================================================================
244# ENTRY POINT
245# ============================================================================
246
247if __name__ == "__main__":
248    print("MULTIRATE SIGNAL PROCESSING")
249
250    # Run demos and collect results
251    (fs, fs_dec, M, t, x, t_naive, x_naive,
252     t_dec, x_dec, sig_freqs) = demo_decimation()
253
254    (fs_in_up, fs_up, L, t_in_up, x_in_up,
255     t_zins, x_zins, t_interp, x_interp) = demo_interpolation()
256
257    (fs_conv_in, fs_conv_out, t_conv_in, x_conv_in,
258     x_conv_poly, x_conv_fft,
259     freqs_in, mag_in_db, freqs_out, mag_out_db) = demo_rate_conversion()
260
261    # -----------------------------------------------------------------------
262    # VISUALIZATION — 3×2 grid
263    # -----------------------------------------------------------------------
264    fig, axes = plt.subplots(3, 2, figsize=(13, 12))
265    fig.suptitle('Multirate Signal Processing', fontsize=13, fontweight='bold')
266
267    # (0,0) Decimation: spectra comparison
268    ax = axes[0, 0]
269    nfft_dec = 4096
270    f_orig, m_orig = spectrum_db(x, fs, nfft=nfft_dec)
271    f_naive, m_naive = spectrum_db(x_naive, fs_dec, nfft=nfft_dec)
272    f_dec, m_dec = spectrum_db(x_dec, fs_dec, nfft=nfft_dec)
273
274    ax.plot(f_orig, m_orig, 'gray', lw=1.2, alpha=0.7, label=f'Input ({fs} Hz)')
275    ax.plot(f_naive, m_naive, 'r-', lw=1.8, label='Naive downsample (aliased)')
276    ax.plot(f_dec, m_dec, 'b--', lw=1.8, label='decimate() anti-aliased')
277    ax.axvline(fs_dec / 2, color='k', ls=':', lw=1, label=f'New Nyquist ({fs_dec//2} Hz)')
278    ax.set_xlabel('Frequency (Hz)')
279    ax.set_ylabel('Magnitude (dBFS)')
280    ax.set_title(f'Decimation by M={M}: aliasing vs. anti-aliasing')
281    ax.legend(fontsize=8)
282    ax.grid(True, alpha=0.3)
283    ax.set_xlim(0, fs_dec / 2)
284    ax.set_ylim(-80, 10)
285
286    # (0,1) Decimation: time domain (first 5 ms)
287    ax = axes[0, 1]
288    n_show = int(0.005 * fs)
289    ax.plot(t[:n_show] * 1e3, x[:n_show], 'gray', lw=1, alpha=0.5, label='Input')
290    n_show_d = int(0.005 * fs_dec)
291    ax.plot(t_naive[:n_show_d] * 1e3, x_naive[:n_show_d],
292            'r-o', ms=4, lw=1.5, label='Naive (aliased)')
293    ax.plot(t_dec[:n_show_d] * 1e3, x_dec[:n_show_d],
294            'b--s', ms=4, lw=1.5, label='decimate() clean')
295    ax.set_xlabel('Time (ms)')
296    ax.set_ylabel('Amplitude')
297    ax.set_title('Decimation: time domain (first 5 ms)')
298    ax.legend(fontsize=8)
299    ax.grid(True, alpha=0.3)
300
301    # (1,0) Interpolation: spectra comparison
302    ax = axes[1, 0]
303    nfft_up = 8192
304    f_in_up, m_in_up = spectrum_db(x_in_up, fs_in_up, nfft=nfft_up)
305    f_zins, m_zins = spectrum_db(x_zins, fs_up, nfft=nfft_up)
306    f_interp, m_interp = spectrum_db(x_interp, fs_up, nfft=nfft_up)
307
308    ax.plot(f_in_up, m_in_up, 'gray', lw=1.2, alpha=0.7,
309            label=f'Input ({fs_in_up} Hz)')
310    ax.plot(f_zins, m_zins, 'r-', lw=1.5, label='Zero-insert (images visible)')
311    ax.plot(f_interp, m_interp, 'b--', lw=1.8, label='resample_poly (filtered)')
312    ax.axvline(fs_in_up / 2, color='k', ls=':', lw=1,
313               label=f'Old Nyquist ({fs_in_up//2} Hz)')
314    ax.set_xlabel('Frequency (Hz)')
315    ax.set_ylabel('Magnitude (dBFS)')
316    ax.set_title(f'Interpolation by L={L}: imaging vs. polyphase')
317    ax.legend(fontsize=8)
318    ax.grid(True, alpha=0.3)
319    ax.set_xlim(0, fs_up / 2)
320    ax.set_ylim(-80, 10)
321
322    # (1,1) Interpolation: time domain (first 5 ms)
323    ax = axes[1, 1]
324    n_show_in = int(0.005 * fs_in_up)
325    ax.plot(t_in_up[:n_show_in] * 1e3, x_in_up[:n_show_in],
326            'ko', ms=5, label=f'Input samples ({fs_in_up} Hz)')
327    n_show_up = int(0.005 * fs_up)
328    ax.plot(np.arange(n_show_up) / fs_up * 1e3, x_zins[:n_show_up],
329            'r-', lw=1.2, alpha=0.7, label='Zero-insert (jagged)')
330    ax.plot(np.arange(len(x_interp[:n_show_up])) / fs_up * 1e3,
331            x_interp[:n_show_up], 'b--', lw=1.8, label='Interpolated (smooth)')
332    ax.set_xlabel('Time (ms)')
333    ax.set_ylabel('Amplitude')
334    ax.set_title('Interpolation: time domain (first 5 ms)')
335    ax.legend(fontsize=8)
336    ax.grid(True, alpha=0.3)
337
338    # (2,0) Rate conversion: input vs output spectra
339    ax = axes[2, 0]
340    ax.plot(freqs_in, mag_in_db, 'gray', lw=1.5, alpha=0.8,
341            label=f'Input ({fs_conv_in} Hz)')
342    ax.plot(freqs_out, mag_out_db, 'b--', lw=1.8,
343            label=f'resample_poly ({fs_conv_out} Hz)')
344    _, mag_fft_db = spectrum_db(x_conv_fft, fs_conv_out, nfft=8192)
345    freqs_fft = np.fft.rfftfreq(8192, d=1.0 / fs_conv_out)
346    ax.plot(freqs_fft, mag_fft_db, 'r:', lw=1.5,
347            label=f'FFT resample ({fs_conv_out} Hz)')
348    ax.set_xlabel('Frequency (Hz)')
349    ax.set_ylabel('Magnitude (dBFS)')
350    ax.set_title(f'Rate Conversion {fs_conv_in}{fs_conv_out} Hz (L/M=160/147)')
351    ax.legend(fontsize=8)
352    ax.grid(True, alpha=0.3)
353    ax.set_xlim(0, 22050)
354    ax.set_ylim(-80, 10)
355
356    # (2,1) Rate conversion: time domain overlay (first 5 ms)
357    ax = axes[2, 1]
358    n_in_show = int(0.005 * fs_conv_in)
359    n_out_show = int(0.005 * fs_conv_out)
360    t_in_axis = np.arange(n_in_show) / fs_conv_in * 1e3
361    t_out_axis = np.arange(n_out_show) / fs_conv_out * 1e3
362    ax.plot(t_in_axis, x_conv_in[:n_in_show], 'ko', ms=3, alpha=0.6,
363            label=f'Input ({fs_conv_in} Hz)')
364    ax.plot(t_out_axis, x_conv_poly[:n_out_show], 'b-', lw=1.8,
365            label=f'resample_poly ({fs_conv_out} Hz)')
366    ax.set_xlabel('Time (ms)')
367    ax.set_ylabel('Amplitude')
368    ax.set_title('Rate Conversion: time domain (first 5 ms)')
369    ax.legend(fontsize=8)
370    ax.grid(True, alpha=0.3)
371
372    plt.tight_layout()
373    out_path = '/opt/projects/01_Personal/03_Study/examples/Signal_Processing/11_multirate.png'
374    plt.savefig(out_path, dpi=150)
375    print(f"\nSaved visualization: {out_path}")
376    plt.close()
377
378    print("\n" + "=" * 65)
379    print("Key Takeaways:")
380    print("  - Decimation without anti-aliasing causes aliasing (folding)")
381    print("  - Anti-aliasing cutoff must be ≤ fs_out/2 BEFORE downsampling")
382    print("  - Upsampling without lowpass causes spectral images (copies)")
383    print("  - Interpolation filter gain = L to restore original amplitude")
384    print("  - Rational resampling: upsample L, lowpass at min(π/L, π/M), downsample M")
385    print("  - resample_poly is memory-efficient; resample uses FFT (exact length)")
386    print("=" * 65)