02_convolution.py

Download
python 363 lines 11.6 KB
  1#!/usr/bin/env python3
  2"""
  3LTI Systems and Convolution
  4
  5Demonstrates linear time-invariant (LTI) system concepts:
  6- Discrete convolution implemented from scratch
  7- Comparison with np.convolve for verification
  8- LTI system response: output = input * impulse_response
  9- Moving average filter as a convolution example
 10- Step-by-step visualization of the sliding-window convolution
 11- Commutativity, associativity, and distributivity properties
 12- RC circuit impulse response and system output
 13
 14Dependencies: numpy, matplotlib
 15"""
 16
 17import numpy as np
 18import matplotlib.pyplot as plt
 19
 20
 21def convolve_scratch(x: np.ndarray, h: np.ndarray) -> np.ndarray:
 22    """
 23    Compute discrete linear convolution of x and h from scratch.
 24
 25    The convolution sum:
 26        y[n] = sum_{k=-inf}^{inf} x[k] * h[n - k]
 27
 28    For finite-length sequences of length N and M,
 29    the output has length N + M - 1.
 30
 31    Parameters
 32    ----------
 33    x : input signal, length N
 34    h : impulse response (kernel), length M
 35
 36    Returns
 37    -------
 38    y : convolution output, length N + M - 1
 39    """
 40    N = len(x)
 41    M = len(h)
 42    L = N + M - 1               # output length
 43    y = np.zeros(L)
 44
 45    for n in range(L):
 46        # Accumulate x[k] * h[n - k] for valid k values
 47        for k in range(N):
 48            m = n - k           # index into h
 49            if 0 <= m < M:
 50                y[n] += x[k] * h[m]
 51    return y
 52
 53
 54def demo_scratch_vs_numpy():
 55    """Verify the from-scratch convolution against np.convolve."""
 56    print("=" * 60)
 57    print("CONVOLUTION: FROM SCRATCH vs np.convolve")
 58    print("=" * 60)
 59
 60    x = np.array([1, 2, 3, 4, 5], dtype=float)
 61    h = np.array([1, 0, -1], dtype=float)   # simple difference filter
 62
 63    y_scratch = convolve_scratch(x, h)
 64    y_numpy   = np.convolve(x, h)            # 'full' mode by default
 65
 66    print(f"Input x     = {x}")
 67    print(f"Kernel h    = {h}")
 68    print(f"y (scratch) = {y_scratch}")
 69    print(f"y (numpy)   = {y_numpy}")
 70    print(f"Max absolute error = {np.max(np.abs(y_scratch - y_numpy)):.2e}")
 71    print(f"Results match: {np.allclose(y_scratch, y_numpy)}")
 72
 73
 74def demo_lti_response():
 75    """
 76    LTI system response: y = x * h
 77
 78    A system is LTI if it satisfies:
 79      - Linearity:   response to a*x1 + b*x2 is a*y1 + b*y2
 80      - Time-invariance: if x[n] -> y[n], then x[n-k] -> y[n-k]
 81
 82    The output of any LTI system is completely described by its
 83    impulse response h[n] via convolution: y[n] = (x * h)[n].
 84    """
 85    print("\n" + "=" * 60)
 86    print("LTI SYSTEM RESPONSE")
 87    print("=" * 60)
 88
 89    n = np.arange(0, 20)
 90
 91    # Input: x[n] = u[n] (unit step)
 92    x = (n >= 0).astype(float)
 93
 94    # Impulse response: h[n] = (0.7)^n * u[n]  (first-order IIR)
 95    alpha = 0.7
 96    h = alpha ** n * (n >= 0).astype(float)
 97
 98    # Output via convolution
 99    y = np.convolve(x, h)[:len(n)]
100
101    # Analytical step response: y[n] = (1 - alpha^(n+1)) / (1 - alpha)
102    y_analytical = (1 - alpha ** (n + 1)) / (1 - alpha)
103
104    print(f"Input x[n] = u[n] (unit step)")
105    print(f"Impulse response h[n] = ({alpha})^n · u[n]")
106    print(f"System: y[n] = sum_k x[k]·h[n-k]")
107    print(f"Max error vs analytical = {np.max(np.abs(y - y_analytical)):.2e}")
108
109    fig, axes = plt.subplots(3, 1, figsize=(9, 9))
110    fig.suptitle("LTI System Response", fontsize=13, fontweight='bold')
111
112    axes[0].stem(n, x, basefmt='k-', linefmt='C0-', markerfmt='C0o')
113    axes[0].set_title("Input x[n] = u[n] (Unit Step)")
114    axes[0].set_ylabel("Amplitude")
115
116    axes[1].stem(n, h, basefmt='k-', linefmt='C1-', markerfmt='C1o')
117    axes[1].set_title(f"Impulse Response h[n] = ({alpha})^n · u[n]")
118    axes[1].set_ylabel("Amplitude")
119
120    axes[2].stem(n, y, basefmt='k-', linefmt='C2-', markerfmt='C2o')
121    axes[2].plot(n, y_analytical, 'r--', label='Analytical')
122    axes[2].set_title("Output y[n] = x[n] * h[n]")
123    axes[2].set_ylabel("Amplitude")
124    axes[2].set_xlabel("n (samples)")
125    axes[2].legend()
126
127    for ax in axes:
128        ax.grid(True, alpha=0.3)
129        ax.set_xlabel("n (samples)")
130
131    plt.tight_layout()
132    plt.show()
133
134
135def demo_moving_average():
136    """
137    Moving average filter as convolution.
138
139    A length-M moving average is a FIR (Finite Impulse Response) filter
140    with impulse response h[n] = 1/M for 0 <= n < M, else 0.
141
142    Convolving a noisy signal with h smooths out rapid fluctuations
143    (acts as a low-pass filter).
144    """
145    print("\n" + "=" * 60)
146    print("MOVING AVERAGE FILTER (CONVOLUTION EXAMPLE)")
147    print("=" * 60)
148
149    rng = np.random.default_rng(0)
150    n = np.arange(0, 100)
151    clean   = np.sin(2 * np.pi * 0.05 * n)
152    noisy   = clean + 0.5 * rng.standard_normal(len(n))
153
154    M = 7                           # filter length
155    h_ma = np.ones(M) / M          # impulse response of moving average
156
157    # 'same' mode keeps output length equal to input
158    filtered = np.convolve(noisy, h_ma, mode='same')
159
160    mse_noisy    = np.mean((noisy    - clean) ** 2)
161    mse_filtered = np.mean((filtered - clean) ** 2)
162    print(f"Filter length M = {M}")
163    print(f"MSE (noisy vs clean)    = {mse_noisy:.4f}")
164    print(f"MSE (filtered vs clean) = {mse_filtered:.4f}")
165    print(f"Noise reduction factor  = {mse_noisy / mse_filtered:.2f}x")
166
167    fig, ax = plt.subplots(figsize=(11, 4))
168    ax.plot(n, noisy,    'lightblue', linewidth=1,   label='Noisy signal')
169    ax.plot(n, clean,    'gray',      linewidth=2,   label='Clean signal', linestyle='--')
170    ax.plot(n, filtered, 'red',       linewidth=2,   label=f'Moving average (M={M})')
171    ax.set_title("Moving Average Filter via Convolution")
172    ax.set_xlabel("n (samples)")
173    ax.set_ylabel("Amplitude")
174    ax.legend()
175    ax.grid(True, alpha=0.3)
176    plt.tight_layout()
177    plt.show()
178
179
180def demo_convolution_stepwise():
181    """
182    Visualize convolution as a sliding-window operation.
183
184    At each output index n, we:
185      1. Flip (reverse) h to get h[-k].
186      2. Shift h[-k] by n to get h[n-k].
187      3. Multiply element-wise with x[k].
188      4. Sum all products to get y[n].
189    """
190    print("\n" + "=" * 60)
191    print("STEP-BY-STEP CONVOLUTION VISUALIZATION")
192    print("=" * 60)
193
194    x = np.array([1, 2, 1, 3, 1], dtype=float)
195    h = np.array([1, 1, 1], dtype=float)   # 3-point moving sum
196    N, M = len(x), len(h)
197    L = N + M - 1
198
199    y = np.zeros(L)
200    n_steps = [1, 2, 4]   # which output indices to visualize
201
202    fig, axes = plt.subplots(len(n_steps), 3, figsize=(13, 8))
203    fig.suptitle("Convolution Step-by-Step (x*h)[n] = Σ x[k]·h[n-k]",
204                 fontsize=12, fontweight='bold')
205
206    # Pad x and flipped h into full output-length arrays for visualization
207    x_padded = np.concatenate([x, np.zeros(M - 1)])
208
209    for row, n_idx in enumerate(n_steps):
210        # h reversed and shifted: h[n-k] for k=0..L-1
211        h_flipped = np.zeros(L)
212        for k in range(L):
213            m = n_idx - k
214            if 0 <= m < M:
215                h_flipped[k] = h[m]
216
217        product = x_padded * h_flipped
218        y[n_idx] = np.sum(product)
219
220        k_ax = np.arange(L)
221        axes[row, 0].stem(k_ax, x_padded,  basefmt='k-', linefmt='C0-', markerfmt='C0o')
222        axes[row, 0].set_title(f"x[k]  (n={n_idx})")
223
224        axes[row, 1].stem(k_ax, h_flipped, basefmt='k-', linefmt='C1-', markerfmt='C1o')
225        axes[row, 1].set_title(f"h[{n_idx}-k]  (flipped & shifted h)")
226
227        axes[row, 2].stem(k_ax, product,   basefmt='k-', linefmt='C2-', markerfmt='C2o')
228        axes[row, 2].set_title(f"x[k]·h[{n_idx}-k], sum={y[n_idx]:.1f} = y[{n_idx}]")
229
230    for ax in axes.flat:
231        ax.set_xlabel("k")
232        ax.grid(True, alpha=0.3)
233
234    plt.tight_layout()
235    plt.show()
236
237    # Compute all outputs and compare
238    y_full = np.convolve(x, h)
239    print(f"x = {x}")
240    print(f"h = {h}")
241    print(f"y = x*h = {y_full}")
242
243
244def demo_properties():
245    """
246    Verify convolution properties:
247      1. Commutativity:   x * h = h * x
248      2. Associativity:   (x * h1) * h2 = x * (h1 * h2)
249      3. Distributivity:  x * (h1 + h2) = x*h1 + x*h2
250    """
251    print("\n" + "=" * 60)
252    print("CONVOLUTION PROPERTIES")
253    print("=" * 60)
254
255    x  = np.array([1, 2, 3, 4], dtype=float)
256    h1 = np.array([1, -1],      dtype=float)
257    h2 = np.array([1, 0, 1],    dtype=float)
258
259    # 1. Commutativity
260    lhs = np.convolve(x, h1)
261    rhs = np.convolve(h1, x)
262    print(f"1. Commutativity  x*h1 = h1*x:       {np.allclose(lhs, rhs)}")
263
264    # 2. Associativity
265    lhs = np.convolve(np.convolve(x, h1), h2)
266    rhs = np.convolve(x, np.convolve(h1, h2))
267    print(f"2. Associativity  (x*h1)*h2 = x*(h1*h2): {np.allclose(lhs, rhs)}")
268
269    # 3. Distributivity
270    lhs = np.convolve(x, h1 + np.pad(h2, (0, len(h1) - len(h2))
271                                     if len(h2) < len(h1) else (len(h2) - len(h1), 0)))
272    # Simpler: pad to equal length
273    max_len = max(len(h1), len(h2))
274    h1_p = np.pad(h1, (0, max_len - len(h1)))
275    h2_p = np.pad(h2, (0, max_len - len(h2)))
276    lhs = np.convolve(x, h1_p + h2_p)
277    rhs = np.convolve(x, h1_p) + np.pad(np.convolve(x, h2_p),
278                                          (0, len(lhs) - len(np.convolve(x, h2_p))))
279    rhs = np.convolve(x, h1_p) + np.convolve(x, h2_p)
280    # Both outputs have the same length here
281    print(f"3. Distributivity x*(h1+h2) = x*h1+x*h2: {np.allclose(lhs, rhs)}")
282
283
284def demo_rc_circuit():
285    """
286    RC circuit impulse response and output for a step input.
287
288    An RC low-pass filter has the continuous-time impulse response:
289        h(t) = (1/RC) * e^(-t/RC) * u(t)
290
291    We discretize at sampling period Ts and compute the output for
292    a unit step input using convolution.
293    """
294    print("\n" + "=" * 60)
295    print("RC CIRCUIT IMPULSE RESPONSE")
296    print("=" * 60)
297
298    RC = 0.1        # time constant (seconds)
299    Ts = 0.001      # sampling period (seconds)
300    T_total = 1.0   # total observation time
301
302    t = np.arange(0, T_total, Ts)
303
304    # Discretized impulse response (scaled so convolution * Ts approximates integral)
305    h = (1 / RC) * np.exp(-t / RC) * Ts
306
307    # Unit step input: x[n] = 1 for all n >= 0
308    x = np.ones(len(t))
309
310    # System output: y = x * h  (truncated to input length)
311    y = np.convolve(x, h)[:len(t)]
312
313    # Analytical step response: y(t) = 1 - e^(-t/RC)
314    y_analytical = 1 - np.exp(-t / RC)
315
316    print(f"RC time constant: {RC} s")
317    print(f"Sampling period:  {Ts} s")
318    print(f"At t = RC = {RC} s, step response should reach 1 - 1/e ≈ {1 - 1/np.e:.4f}")
319    idx_RC = int(RC / Ts)
320    print(f"Numerical value at t=RC:   {y[idx_RC]:.4f}")
321    print(f"Analytical value at t=RC:  {y_analytical[idx_RC]:.4f}")
322    print(f"Max error: {np.max(np.abs(y - y_analytical)):.2e}")
323
324    fig, axes = plt.subplots(1, 3, figsize=(13, 4))
325    fig.suptitle("RC Low-Pass Filter (Convolution)", fontsize=13, fontweight='bold')
326
327    axes[0].plot(t * 1000, h / Ts, 'b')   # scale back to density
328    axes[0].set_title("Impulse Response h(t) = (1/RC)·e^(-t/RC)")
329    axes[0].set_xlabel("t (ms)")
330    axes[0].set_ylabel("h(t)")
331
332    axes[1].plot(t * 1000, x, 'g', linewidth=2)
333    axes[1].set_title("Input x(t) = u(t) (Unit Step)")
334    axes[1].set_xlabel("t (ms)")
335    axes[1].set_ylim(-0.1, 1.5)
336
337    axes[2].plot(t * 1000, y,            'r',    linewidth=2, label='Convolution')
338    axes[2].plot(t * 1000, y_analytical, 'k--',  linewidth=1, label='Analytical')
339    axes[2].axhline(1 - 1/np.e, color='gray', linestyle=':', label='63.2% (t=RC)')
340    axes[2].axvline(RC * 1000,  color='gray', linestyle=':')
341    axes[2].set_title("Output y(t) = 1 - e^(-t/RC)")
342    axes[2].set_xlabel("t (ms)")
343    axes[2].legend()
344
345    for ax in axes:
346        ax.grid(True, alpha=0.3)
347
348    plt.tight_layout()
349    plt.show()
350
351
352if __name__ == "__main__":
353    demo_scratch_vs_numpy()
354    demo_lti_response()
355    demo_moving_average()
356    demo_convolution_stepwise()
357    demo_properties()
358    demo_rc_circuit()
359
360    print("\n" + "=" * 60)
361    print("All demonstrations completed!")
362    print("=" * 60)