consumer.py

Download
python 337 lines 10.3 KB
  1"""
  2Kafka Consumer ์˜ˆ์ œ
  3
  4Kafka ํ† ํ”ฝ์—์„œ ๋ฉ”์‹œ์ง€๋ฅผ ์†Œ๋น„ํ•˜๋Š” Consumer ์˜ˆ์ œ์ž…๋‹ˆ๋‹ค.
  5
  6ํ•„์ˆ˜ ํŒจํ‚ค์ง€: pip install confluent-kafka
  7
  8์‹คํ–‰ ์ „:
  9  1. Kafka ์‹คํ–‰ ํ•„์š”
 10  2. producer.py๋กœ ๋ฉ”์‹œ์ง€ ๋ฐœํ–‰
 11
 12์‹คํ–‰: python consumer.py
 13"""
 14
 15from confluent_kafka import Consumer, KafkaError, KafkaException
 16import json
 17import signal
 18import sys
 19from datetime import datetime
 20from typing import Callable, Optional, Dict, List
 21from collections import defaultdict
 22
 23
 24class KafkaConsumerExample:
 25    """Kafka Consumer ์˜ˆ์ œ ํด๋ž˜์Šค"""
 26
 27    def __init__(
 28        self,
 29        bootstrap_servers: str = 'localhost:9092',
 30        group_id: str = 'example-consumer-group',
 31        auto_commit: bool = True
 32    ):
 33        self.config = {
 34            'bootstrap.servers': bootstrap_servers,
 35            'group.id': group_id,
 36            'auto.offset.reset': 'earliest',  # ์ฒ˜์Œ๋ถ€ํ„ฐ ์ฝ๊ธฐ
 37            'enable.auto.commit': auto_commit,
 38            'auto.commit.interval.ms': 5000,
 39            'session.timeout.ms': 45000,
 40            'max.poll.interval.ms': 300000,
 41        }
 42        self.consumer = Consumer(self.config)
 43        self.running = True
 44        self.message_count = 0
 45        self.error_count = 0
 46
 47        # Graceful shutdown ์„ค์ •
 48        signal.signal(signal.SIGINT, self._signal_handler)
 49        signal.signal(signal.SIGTERM, self._signal_handler)
 50
 51    def _signal_handler(self, signum, frame):
 52        """์ข…๋ฃŒ ์‹œ๊ทธ๋„ ํ•ธ๋“ค๋Ÿฌ"""
 53        print("\n[INFO] Shutdown signal received")
 54        self.running = False
 55
 56    def subscribe(self, topics: List[str]):
 57        """ํ† ํ”ฝ ๊ตฌ๋…"""
 58        def on_assign(consumer, partitions):
 59            print(f'[INFO] Partitions assigned: {[p.partition for p in partitions]}')
 60
 61        def on_revoke(consumer, partitions):
 62            print(f'[INFO] Partitions revoked: {[p.partition for p in partitions]}')
 63
 64        self.consumer.subscribe(topics, on_assign=on_assign, on_revoke=on_revoke)
 65        print(f'[INFO] Subscribed to topics: {topics}')
 66
 67    def consume(self, handler: Callable[[dict, dict], None], timeout: float = 1.0):
 68        """๋ฉ”์‹œ์ง€ ์†Œ๋น„ ๋ฃจํ”„"""
 69        print('[INFO] Starting consumer loop...')
 70
 71        while self.running:
 72            try:
 73                msg = self.consumer.poll(timeout=timeout)
 74
 75                if msg is None:
 76                    continue
 77
 78                if msg.error():
 79                    if msg.error().code() == KafkaError._PARTITION_EOF:
 80                        # ํŒŒํ‹ฐ์…˜ ๋์— ๋„๋‹ฌ
 81                        continue
 82                    else:
 83                        print(f'[ERROR] Consumer error: {msg.error()}')
 84                        self.error_count += 1
 85                        continue
 86
 87                # ๋ฉ”์‹œ์ง€ ํŒŒ์‹ฑ
 88                try:
 89                    key = msg.key().decode('utf-8') if msg.key() else None
 90                    value = json.loads(msg.value().decode('utf-8'))
 91
 92                    metadata = {
 93                        'topic': msg.topic(),
 94                        'partition': msg.partition(),
 95                        'offset': msg.offset(),
 96                        'timestamp': msg.timestamp(),
 97                        'key': key,
 98                    }
 99
100                    # ํ•ธ๋“ค๋Ÿฌ ํ˜ธ์ถœ
101                    handler(value, metadata)
102                    self.message_count += 1
103
104                except json.JSONDecodeError as e:
105                    print(f'[ERROR] Failed to parse message: {e}')
106                    self.error_count += 1
107
108            except KafkaException as e:
109                print(f'[ERROR] Kafka exception: {e}')
110                self.error_count += 1
111
112        self.close()
113
114    def commit(self):
115        """์ˆ˜๋™ ์ปค๋ฐ‹"""
116        self.consumer.commit()
117
118    def close(self):
119        """Consumer ์ข…๋ฃŒ"""
120        print('\n[INFO] Closing consumer...')
121        self.consumer.close()
122        print(f'[SUMMARY] Total consumed: {self.message_count}, Errors: {self.error_count}')
123
124
125class MessageAggregator:
126    """๋ฉ”์‹œ์ง€ ์ง‘๊ณ„ ํด๋ž˜์Šค"""
127
128    def __init__(self, window_seconds: int = 60):
129        self.window_seconds = window_seconds
130        self.counts = defaultdict(int)
131        self.amounts = defaultdict(float)
132        self.last_report = datetime.now()
133
134    def add(self, event_type: str, amount: float = 0):
135        """์ด๋ฒคํŠธ ์ถ”๊ฐ€"""
136        self.counts[event_type] += 1
137        self.amounts[event_type] += amount
138
139    def should_report(self) -> bool:
140        """๋ณด๊ณ  ์‹œ๊ฐ„ ํ™•์ธ"""
141        elapsed = (datetime.now() - self.last_report).seconds
142        return elapsed >= self.window_seconds
143
144    def report_and_reset(self) -> dict:
145        """๋ณด๊ณ  ๋ฐ ๋ฆฌ์…‹"""
146        report = {
147            'window_end': datetime.now().isoformat(),
148            'counts': dict(self.counts),
149            'amounts': dict(self.amounts),
150        }
151        self.counts.clear()
152        self.amounts.clear()
153        self.last_report = datetime.now()
154        return report
155
156
157def demo_simple_consumer():
158    """๊ฐ„๋‹จํ•œ Consumer ๋ฐ๋ชจ"""
159    print("=" * 60)
160    print("Simple Consumer Demo")
161    print("=" * 60)
162
163    consumer = KafkaConsumerExample(group_id='simple-consumer-group')
164    consumer.subscribe(['demo-topic'])
165
166    def message_handler(value: dict, metadata: dict):
167        print(f"Received: topic={metadata['topic']}, "
168              f"partition={metadata['partition']}, "
169              f"offset={metadata['offset']}")
170        print(f"  Key: {metadata['key']}")
171        print(f"  Value: {value}")
172        print()
173
174    consumer.consume(message_handler)
175
176
177def demo_order_consumer():
178    """์ฃผ๋ฌธ ์ด๋ฒคํŠธ Consumer ๋ฐ๋ชจ"""
179    print("\n" + "=" * 60)
180    print("Order Consumer Demo")
181    print("=" * 60)
182
183    consumer = KafkaConsumerExample(group_id='order-consumer-group')
184    consumer.subscribe(['orders'])
185
186    aggregator = MessageAggregator(window_seconds=10)
187
188    def order_handler(value: dict, metadata: dict):
189        # ์ด๋ฒคํŠธ ์ง‘๊ณ„
190        aggregator.add(value.get('status', 'unknown'), value.get('amount', 0))
191
192        # ์ฃผ๊ธฐ์  ๋ณด๊ณ 
193        if aggregator.should_report():
194            report = aggregator.report_and_reset()
195            print(f"\n[REPORT] {report['window_end']}")
196            print(f"  Order counts: {report['counts']}")
197            print(f"  Total amounts: {report['amounts']}")
198
199        # ํŠน์ • ์กฐ๊ฑด ์ฒ˜๋ฆฌ (์˜ˆ: ๊ณ ์•ก ์ฃผ๋ฌธ)
200        if value.get('amount', 0) > 500:
201            print(f"[ALERT] High-value order: {value['order_id']} - ${value['amount']}")
202
203    print("Processing orders (Press Ctrl+C to stop)...")
204    consumer.consume(order_handler)
205
206
207def demo_multi_topic_consumer():
208    """๋ฉ€ํ‹ฐ ํ† ํ”ฝ Consumer ๋ฐ๋ชจ"""
209    print("\n" + "=" * 60)
210    print("Multi-Topic Consumer Demo")
211    print("=" * 60)
212
213    consumer = KafkaConsumerExample(group_id='multi-topic-group')
214    consumer.subscribe(['orders', 'clickstream', 'inventory'])
215
216    topic_counts = defaultdict(int)
217
218    def multi_handler(value: dict, metadata: dict):
219        topic = metadata['topic']
220        topic_counts[topic] += 1
221
222        # ํ† ํ”ฝ๋ณ„ ์ฒ˜๋ฆฌ
223        if topic == 'orders':
224            print(f"[ORDER] {value.get('order_id')}: ${value.get('amount')}")
225        elif topic == 'clickstream':
226            if topic_counts[topic] % 10 == 0:  # 10๊ฐœ๋งˆ๋‹ค ์ถœ๋ ฅ
227                print(f"[CLICK] Processed {topic_counts[topic]} events")
228        elif topic == 'inventory':
229            if value.get('current_stock', 100) < 10:
230                print(f"[LOW STOCK] {value.get('product_sku')}: {value.get('current_stock')} units")
231
232    print("Processing multiple topics (Press Ctrl+C to stop)...")
233    consumer.consume(multi_handler)
234
235
236def demo_manual_commit_consumer():
237    """์ˆ˜๋™ ์ปค๋ฐ‹ Consumer ๋ฐ๋ชจ"""
238    print("\n" + "=" * 60)
239    print("Manual Commit Consumer Demo")
240    print("=" * 60)
241
242    consumer = KafkaConsumerExample(
243        group_id='manual-commit-group',
244        auto_commit=False  # ์ž๋™ ์ปค๋ฐ‹ ๋น„ํ™œ์„ฑํ™”
245    )
246    consumer.subscribe(['batch-orders'])
247
248    batch = []
249    batch_size = 10
250
251    def batch_handler(value: dict, metadata: dict):
252        batch.append(value)
253
254        # ๋ฐฐ์น˜ ์ฒ˜๋ฆฌ
255        if len(batch) >= batch_size:
256            print(f"\n[BATCH] Processing {len(batch)} messages...")
257
258            # ๋ฐฐ์น˜ ์ฒ˜๋ฆฌ ๋กœ์ง
259            total_amount = sum(msg.get('amount', 0) for msg in batch)
260            print(f"  Total amount: ${total_amount:.2f}")
261
262            # ์ฒ˜๋ฆฌ ์„ฑ๊ณต ํ›„ ์ปค๋ฐ‹
263            consumer.commit()
264            print(f"  Committed offset")
265
266            batch.clear()
267
268    print("Processing in batches with manual commit (Press Ctrl+C to stop)...")
269    consumer.consume(batch_handler)
270
271
272def demo_stateful_consumer():
273    """์ƒํƒœ ์œ ์ง€ Consumer ๋ฐ๋ชจ"""
274    print("\n" + "=" * 60)
275    print("Stateful Consumer Demo")
276    print("=" * 60)
277
278    consumer = KafkaConsumerExample(group_id='stateful-group')
279    consumer.subscribe(['orders'])
280
281    # ๊ณ ๊ฐ๋ณ„ ์ƒํƒœ ์œ ์ง€
282    customer_state = defaultdict(lambda: {
283        'order_count': 0,
284        'total_spent': 0.0,
285        'last_order': None
286    })
287
288    def stateful_handler(value: dict, metadata: dict):
289        customer_id = value.get('customer_id')
290        if not customer_id:
291            return
292
293        state = customer_state[customer_id]
294        state['order_count'] += 1
295        state['total_spent'] += value.get('amount', 0)
296        state['last_order'] = value.get('timestamp')
297
298        # VIP ๊ณ ๊ฐ ๊ฐ์ง€
299        if state['total_spent'] > 2000:
300            print(f"[VIP] Customer {customer_id}: "
301                  f"{state['order_count']} orders, ${state['total_spent']:.2f} total")
302
303    print("Tracking customer state (Press Ctrl+C to stop)...")
304    consumer.consume(stateful_handler)
305
306
307def main():
308    print("Kafka Consumer Examples")
309    print("=" * 60)
310    print("Make sure Kafka is running and has messages")
311    print("Run producer.py first to generate messages")
312    print()
313
314    demos = {
315        '1': ('Simple Consumer', demo_simple_consumer),
316        '2': ('Order Consumer', demo_order_consumer),
317        '3': ('Multi-Topic Consumer', demo_multi_topic_consumer),
318        '4': ('Manual Commit Consumer', demo_manual_commit_consumer),
319        '5': ('Stateful Consumer', demo_stateful_consumer),
320    }
321
322    print("Available demos:")
323    for key, (name, _) in demos.items():
324        print(f"  {key}. {name}")
325
326    choice = input("\nSelect demo (1-5): ").strip()
327
328    if choice in demos:
329        demos[choice][1]()
330    else:
331        print("Invalid choice, running simple demo...")
332        demo_simple_consumer()
333
334
335if __name__ == "__main__":
336    main()