1#!/usr/bin/env python3
2"""
3Multirate Signal Processing
4============================
5
6This script demonstrates the core concepts of multirate DSP:
7
81. Downsampling (decimation) — with and without anti-aliasing filter
92. Upsampling (interpolation) — with and without interpolation filter
103. Aliasing visualization — what happens when Nyquist theorem is violated
114. Rational rate conversion — resampling from 44100 Hz to 48000 Hz (L/M = 160/147)
125. Polyphase view — why decimation and interpolation filters work
13
14Key Concepts:
15 - Decimation by M : keep every M-th sample → Nyquist halves → aliasing possible
16 - Interpolation by L : insert L-1 zeros, then lowpass → smooth the zeros
17 - Rational resampling: upsample by L, lowpass, then downsample by M
18 - Anti-aliasing filter cutoff must be ≤ π/M (normalized) before decimation
19 - Interpolation filter cutoff must be ≤ π/L and gain = L after upsampling
20
21Practical note:
22 scipy.signal.decimate = lowpass filter + downsample (in one call)
23 scipy.signal.resample = FFT-based resampling (handles arbitrary ratios)
24 scipy.signal.resample_poly = polyphase rational resampling (exact L/M)
25
26Author: Educational example for Signal Processing
27License: MIT
28"""
29
30import numpy as np
31from scipy import signal
32import matplotlib.pyplot as plt
33
34
35# ============================================================================
36# HELPERS
37# ============================================================================
38
39def spectrum_db(x, fs, nfft=2048):
40 """
41 Compute one-sided power spectrum in dBFS.
42
43 Args:
44 x (ndarray): Input signal
45 fs (float) : Sampling rate (Hz)
46 nfft (int) : FFT size
47
48 Returns:
49 (freqs, mag_db): frequency axis (Hz) and magnitude (dBFS)
50 """
51 X = np.fft.rfft(x, n=nfft)
52 mag = np.abs(X) / nfft
53 mag_db = 20 * np.log10(np.maximum(mag, 1e-15))
54 freqs = np.fft.rfftfreq(nfft, d=1.0 / fs)
55 return freqs, mag_db
56
57
58def make_test_signal(fs, duration=0.5, freqs_hz=None, amps=None, seed=0):
59 """
60 Construct a multi-tone signal.
61
62 Args:
63 fs (float): Sampling rate
64 duration (float): Signal duration in seconds
65 freqs_hz (list) : Sinusoid frequencies in Hz
66 amps (list) : Corresponding amplitudes
67
68 Returns:
69 (t, x): time vector and signal
70 """
71 if freqs_hz is None:
72 freqs_hz = [1000, 5000, 9000]
73 if amps is None:
74 amps = [1.0, 0.8, 0.6]
75
76 t = np.arange(0, duration, 1.0 / fs)
77 x = np.zeros(len(t))
78 for f, a in zip(freqs_hz, amps):
79 x += a * np.sin(2 * np.pi * f * t)
80 return t, x
81
82
83# ============================================================================
84# DEMO 1: DECIMATION (WITH AND WITHOUT ANTI-ALIASING)
85# ============================================================================
86
87def demo_decimation():
88 """
89 Decimate a signal by factor M = 4, comparing:
90 (a) naive downsampling — no anti-aliasing filter → aliasing
91 (b) scipy.signal.decimate — built-in anti-aliasing Chebyshev lowpass
92 """
93 print("=" * 65)
94 print("1. DECIMATION BY M=4 (44100 → 11025 Hz)")
95 print("=" * 65)
96
97 fs = 44100
98 M = 4 # decimation factor
99 fs_out = fs // M # 11025 Hz
100 nyquist_out = fs_out / 2 # new Nyquist = 5512.5 Hz
101
102 # Test signal: three tones at 1 kHz (safe), 4 kHz (safe), 8 kHz (alias!)
103 # 8 kHz > nyquist_out → will alias to 8000 − 11025 = −3025 → 3025 Hz
104 freqs = [1000, 4000, 8000]
105 t, x = make_test_signal(fs, duration=0.2, freqs_hz=freqs, amps=[1.0, 0.8, 0.6])
106
107 print(f" Input: fs={fs} Hz, {freqs} Hz tones")
108 print(f" Decimation factor M={M} → output fs={fs_out} Hz")
109 print(f" New Nyquist = {nyquist_out} Hz → tones above {nyquist_out} Hz WILL alias")
110
111 # (a) Naive downsampling — select every M-th sample, NO filtering
112 x_naive = x[::M]
113 t_naive = t[::M]
114
115 alias_freq = abs(freqs[-1] - fs_out) # predicted aliased frequency
116 print(f"\n (a) Naive downsampling: {freqs[-1]} Hz aliases to {alias_freq} Hz")
117
118 # (b) scipy.signal.decimate — applies Chebyshev I lowpass, then downsample
119 # Default: order-8 Chebyshev I with cutoff at fs_out/2
120 x_decimated = signal.decimate(x, M, ftype='fir', zero_phase=True)
121 t_decimated = np.arange(len(x_decimated)) / fs_out
122
123 print(f" (b) decimate(): anti-aliasing FIR lowpass → clean output")
124
125 # Power at the alias frequency after each method
126 for label, sig, sr in [('Naive', x_naive, fs_out),
127 ('Decimated', x_decimated, fs_out)]:
128 _, mag_db = spectrum_db(sig, sr, nfft=4096)
129 freqs_axis = np.fft.rfftfreq(4096, d=1.0 / sr)
130 idx = np.argmin(np.abs(freqs_axis - alias_freq))
131 print(f" {label:<12}: power at alias ({alias_freq} Hz) = "
132 f"{mag_db[idx]:.1f} dBFS")
133
134 return fs, fs_out, M, t, x, t_naive, x_naive, t_decimated, x_decimated, freqs
135
136
137# ============================================================================
138# DEMO 2: UPSAMPLING / INTERPOLATION
139# ============================================================================
140
141def demo_interpolation():
142 """
143 Upsample a signal by factor L = 4, comparing:
144 (a) zero insertion only — no interpolation filter → imaging
145 (b) scipy.signal.resample_poly with L, 1 — polyphase interpolation
146 """
147 print("\n" + "=" * 65)
148 print("2. INTERPOLATION BY L=4 (11025 → 44100 Hz)")
149 print("=" * 65)
150
151 fs_in = 11025
152 L = 4
153 fs_out = fs_in * L # 44100 Hz
154
155 freqs = [1000, 4000]
156 t_in, x_in = make_test_signal(fs_in, duration=0.2, freqs_hz=freqs)
157
158 print(f" Input: fs={fs_in} Hz, {freqs} Hz tones")
159 print(f" Interpolation factor L={L} → output fs={fs_out} Hz")
160
161 # (a) Zero insertion only: insert L-1 zeros between every sample
162 x_upsampled = np.zeros(len(x_in) * L)
163 x_upsampled[::L] = x_in # place original samples at every L-th position
164 t_up = np.arange(len(x_upsampled)) / fs_out
165
166 print(f"\n (a) Zero insertion: creates images at multiples of {fs_in} Hz")
167
168 # (b) Polyphase interpolation via resample_poly
169 x_interp = signal.resample_poly(x_in, L, 1) # upsample by L, downsample by 1
170 t_interp = np.arange(len(x_interp)) / fs_out
171
172 print(f" (b) resample_poly(L=4, M=1): built-in polyphase filter removes images")
173
174 # Check power in image band (around 11025 − 1000 = 10025 Hz)
175 image_freq = fs_in - freqs[0]
176 for label, sig, sr in [('Zero-insert', x_upsampled, fs_out),
177 ('resample_poly', x_interp, fs_out)]:
178 _, mag_db = spectrum_db(sig, sr, nfft=8192)
179 freqs_axis = np.fft.rfftfreq(8192, d=1.0 / sr)
180 idx = np.argmin(np.abs(freqs_axis - image_freq))
181 print(f" {label:<16}: image power at {image_freq} Hz = {mag_db[idx]:.1f} dBFS")
182
183 return fs_in, fs_out, L, t_in, x_in, t_up, x_upsampled, t_interp, x_interp
184
185
186# ============================================================================
187# DEMO 3: RATIONAL RATE CONVERSION 44100 → 48000 Hz
188# ============================================================================
189
190def demo_rate_conversion():
191 """
192 Convert from CD rate (44100 Hz) to professional audio (48000 Hz).
193
194 Rational ratio: 48000/44100 = 160/147 (L=160, M=147)
195 Process: upsample by 160, lowpass filter, downsample by 147.
196 scipy.signal.resample_poly handles this efficiently using a polyphase filter.
197 """
198 print("\n" + "=" * 65)
199 print("3. RATIONAL RATE CONVERSION 44100 Hz → 48000 Hz (L/M = 160/147)")
200 print("=" * 65)
201
202 fs_in = 44100
203 fs_out = 48000
204 L, M = 160, 147 # exact rational ratio
205
206 duration = 0.05 # short segment — polyphase is memory-intensive for large L
207 freqs = [440, 4000, 10000] # A4, ~mid, ~high
208 t_in, x_in = make_test_signal(fs_in, duration=duration, freqs_hz=freqs)
209 t_out_expected = np.arange(int(duration * fs_out)) / fs_out
210
211 print(f" Input: {fs_in} Hz, {len(x_in)} samples")
212 print(f" Ratio: {fs_out}/{fs_in} = {L}/{M}")
213
214 # scipy resample_poly: efficient polyphase implementation
215 x_resampled = signal.resample_poly(x_in, L, M)
216 print(f" Output (resample_poly): {len(x_resampled)} samples "
217 f"(expected ≈ {int(round(len(x_in) * L / M))})")
218
219 # FFT-based resample (scipy.signal.resample) — alternative
220 n_out = int(round(len(x_in) * fs_out / fs_in))
221 x_fft_rs = signal.resample(x_in, n_out)
222 print(f" Output (FFT resample) : {len(x_fft_rs)} samples")
223
224 # Verify spectral content is preserved
225 print(f"\n Verifying tonal content ({freqs} Hz) is preserved:")
226 _, mag_in_db = spectrum_db(x_in, fs_in, nfft=8192)
227 freqs_in = np.fft.rfftfreq(8192, d=1.0 / fs_in)
228 _, mag_out_db = spectrum_db(x_resampled, fs_out, nfft=8192)
229 freqs_out = np.fft.rfftfreq(8192, d=1.0 / fs_out)
230
231 print(f" {'Freq':>6} {'In (dBFS)':>12} {'Out (dBFS)':>12} {'Diff':>8}")
232 for f in freqs:
233 idx_in = np.argmin(np.abs(freqs_in - f))
234 idx_out = np.argmin(np.abs(freqs_out - f))
235 diff = mag_out_db[idx_out] - mag_in_db[idx_in]
236 print(f" {f:>6} Hz {mag_in_db[idx_in]:>10.1f} "
237 f"{mag_out_db[idx_out]:>10.1f} {diff:>+7.2f} dB")
238
239 return (fs_in, fs_out, t_in, x_in, x_resampled, x_fft_rs,
240 freqs_in, mag_in_db, freqs_out, mag_out_db)
241
242
243# ============================================================================
244# ENTRY POINT
245# ============================================================================
246
247if __name__ == "__main__":
248 print("MULTIRATE SIGNAL PROCESSING")
249
250 # Run demos and collect results
251 (fs, fs_dec, M, t, x, t_naive, x_naive,
252 t_dec, x_dec, sig_freqs) = demo_decimation()
253
254 (fs_in_up, fs_up, L, t_in_up, x_in_up,
255 t_zins, x_zins, t_interp, x_interp) = demo_interpolation()
256
257 (fs_conv_in, fs_conv_out, t_conv_in, x_conv_in,
258 x_conv_poly, x_conv_fft,
259 freqs_in, mag_in_db, freqs_out, mag_out_db) = demo_rate_conversion()
260
261 # -----------------------------------------------------------------------
262 # VISUALIZATION — 3×2 grid
263 # -----------------------------------------------------------------------
264 fig, axes = plt.subplots(3, 2, figsize=(13, 12))
265 fig.suptitle('Multirate Signal Processing', fontsize=13, fontweight='bold')
266
267 # (0,0) Decimation: spectra comparison
268 ax = axes[0, 0]
269 nfft_dec = 4096
270 f_orig, m_orig = spectrum_db(x, fs, nfft=nfft_dec)
271 f_naive, m_naive = spectrum_db(x_naive, fs_dec, nfft=nfft_dec)
272 f_dec, m_dec = spectrum_db(x_dec, fs_dec, nfft=nfft_dec)
273
274 ax.plot(f_orig, m_orig, 'gray', lw=1.2, alpha=0.7, label=f'Input ({fs} Hz)')
275 ax.plot(f_naive, m_naive, 'r-', lw=1.8, label='Naive downsample (aliased)')
276 ax.plot(f_dec, m_dec, 'b--', lw=1.8, label='decimate() anti-aliased')
277 ax.axvline(fs_dec / 2, color='k', ls=':', lw=1, label=f'New Nyquist ({fs_dec//2} Hz)')
278 ax.set_xlabel('Frequency (Hz)')
279 ax.set_ylabel('Magnitude (dBFS)')
280 ax.set_title(f'Decimation by M={M}: aliasing vs. anti-aliasing')
281 ax.legend(fontsize=8)
282 ax.grid(True, alpha=0.3)
283 ax.set_xlim(0, fs_dec / 2)
284 ax.set_ylim(-80, 10)
285
286 # (0,1) Decimation: time domain (first 5 ms)
287 ax = axes[0, 1]
288 n_show = int(0.005 * fs)
289 ax.plot(t[:n_show] * 1e3, x[:n_show], 'gray', lw=1, alpha=0.5, label='Input')
290 n_show_d = int(0.005 * fs_dec)
291 ax.plot(t_naive[:n_show_d] * 1e3, x_naive[:n_show_d],
292 'r-o', ms=4, lw=1.5, label='Naive (aliased)')
293 ax.plot(t_dec[:n_show_d] * 1e3, x_dec[:n_show_d],
294 'b--s', ms=4, lw=1.5, label='decimate() clean')
295 ax.set_xlabel('Time (ms)')
296 ax.set_ylabel('Amplitude')
297 ax.set_title('Decimation: time domain (first 5 ms)')
298 ax.legend(fontsize=8)
299 ax.grid(True, alpha=0.3)
300
301 # (1,0) Interpolation: spectra comparison
302 ax = axes[1, 0]
303 nfft_up = 8192
304 f_in_up, m_in_up = spectrum_db(x_in_up, fs_in_up, nfft=nfft_up)
305 f_zins, m_zins = spectrum_db(x_zins, fs_up, nfft=nfft_up)
306 f_interp, m_interp = spectrum_db(x_interp, fs_up, nfft=nfft_up)
307
308 ax.plot(f_in_up, m_in_up, 'gray', lw=1.2, alpha=0.7,
309 label=f'Input ({fs_in_up} Hz)')
310 ax.plot(f_zins, m_zins, 'r-', lw=1.5, label='Zero-insert (images visible)')
311 ax.plot(f_interp, m_interp, 'b--', lw=1.8, label='resample_poly (filtered)')
312 ax.axvline(fs_in_up / 2, color='k', ls=':', lw=1,
313 label=f'Old Nyquist ({fs_in_up//2} Hz)')
314 ax.set_xlabel('Frequency (Hz)')
315 ax.set_ylabel('Magnitude (dBFS)')
316 ax.set_title(f'Interpolation by L={L}: imaging vs. polyphase')
317 ax.legend(fontsize=8)
318 ax.grid(True, alpha=0.3)
319 ax.set_xlim(0, fs_up / 2)
320 ax.set_ylim(-80, 10)
321
322 # (1,1) Interpolation: time domain (first 5 ms)
323 ax = axes[1, 1]
324 n_show_in = int(0.005 * fs_in_up)
325 ax.plot(t_in_up[:n_show_in] * 1e3, x_in_up[:n_show_in],
326 'ko', ms=5, label=f'Input samples ({fs_in_up} Hz)')
327 n_show_up = int(0.005 * fs_up)
328 ax.plot(np.arange(n_show_up) / fs_up * 1e3, x_zins[:n_show_up],
329 'r-', lw=1.2, alpha=0.7, label='Zero-insert (jagged)')
330 ax.plot(np.arange(len(x_interp[:n_show_up])) / fs_up * 1e3,
331 x_interp[:n_show_up], 'b--', lw=1.8, label='Interpolated (smooth)')
332 ax.set_xlabel('Time (ms)')
333 ax.set_ylabel('Amplitude')
334 ax.set_title('Interpolation: time domain (first 5 ms)')
335 ax.legend(fontsize=8)
336 ax.grid(True, alpha=0.3)
337
338 # (2,0) Rate conversion: input vs output spectra
339 ax = axes[2, 0]
340 ax.plot(freqs_in, mag_in_db, 'gray', lw=1.5, alpha=0.8,
341 label=f'Input ({fs_conv_in} Hz)')
342 ax.plot(freqs_out, mag_out_db, 'b--', lw=1.8,
343 label=f'resample_poly ({fs_conv_out} Hz)')
344 _, mag_fft_db = spectrum_db(x_conv_fft, fs_conv_out, nfft=8192)
345 freqs_fft = np.fft.rfftfreq(8192, d=1.0 / fs_conv_out)
346 ax.plot(freqs_fft, mag_fft_db, 'r:', lw=1.5,
347 label=f'FFT resample ({fs_conv_out} Hz)')
348 ax.set_xlabel('Frequency (Hz)')
349 ax.set_ylabel('Magnitude (dBFS)')
350 ax.set_title(f'Rate Conversion {fs_conv_in}→{fs_conv_out} Hz (L/M=160/147)')
351 ax.legend(fontsize=8)
352 ax.grid(True, alpha=0.3)
353 ax.set_xlim(0, 22050)
354 ax.set_ylim(-80, 10)
355
356 # (2,1) Rate conversion: time domain overlay (first 5 ms)
357 ax = axes[2, 1]
358 n_in_show = int(0.005 * fs_conv_in)
359 n_out_show = int(0.005 * fs_conv_out)
360 t_in_axis = np.arange(n_in_show) / fs_conv_in * 1e3
361 t_out_axis = np.arange(n_out_show) / fs_conv_out * 1e3
362 ax.plot(t_in_axis, x_conv_in[:n_in_show], 'ko', ms=3, alpha=0.6,
363 label=f'Input ({fs_conv_in} Hz)')
364 ax.plot(t_out_axis, x_conv_poly[:n_out_show], 'b-', lw=1.8,
365 label=f'resample_poly ({fs_conv_out} Hz)')
366 ax.set_xlabel('Time (ms)')
367 ax.set_ylabel('Amplitude')
368 ax.set_title('Rate Conversion: time domain (first 5 ms)')
369 ax.legend(fontsize=8)
370 ax.grid(True, alpha=0.3)
371
372 plt.tight_layout()
373 out_path = '/opt/projects/01_Personal/03_Study/examples/Signal_Processing/11_multirate.png'
374 plt.savefig(out_path, dpi=150)
375 print(f"\nSaved visualization: {out_path}")
376 plt.close()
377
378 print("\n" + "=" * 65)
379 print("Key Takeaways:")
380 print(" - Decimation without anti-aliasing causes aliasing (folding)")
381 print(" - Anti-aliasing cutoff must be ≤ fs_out/2 BEFORE downsampling")
382 print(" - Upsampling without lowpass causes spectral images (copies)")
383 print(" - Interpolation filter gain = L to restore original amplitude")
384 print(" - Rational resampling: upsample L, lowpass at min(π/L, π/M), downsample M")
385 print(" - resample_poly is memory-efficient; resample uses FFT (exact length)")
386 print("=" * 65)