1"""
2Kafka Consumer ์์
3
4Kafka ํ ํฝ์์ ๋ฉ์์ง๋ฅผ ์๋นํ๋ Consumer ์์ ์
๋๋ค.
5
6ํ์ ํจํค์ง: pip install confluent-kafka
7
8์คํ ์ :
9 1. Kafka ์คํ ํ์
10 2. producer.py๋ก ๋ฉ์์ง ๋ฐํ
11
12์คํ: python consumer.py
13"""
14
15from confluent_kafka import Consumer, KafkaError, KafkaException
16import json
17import signal
18import sys
19from datetime import datetime
20from typing import Callable, Optional, Dict, List
21from collections import defaultdict
22
23
24class KafkaConsumerExample:
25 """Kafka Consumer ์์ ํด๋์ค"""
26
27 def __init__(
28 self,
29 bootstrap_servers: str = 'localhost:9092',
30 group_id: str = 'example-consumer-group',
31 auto_commit: bool = True
32 ):
33 self.config = {
34 'bootstrap.servers': bootstrap_servers,
35 'group.id': group_id,
36 'auto.offset.reset': 'earliest', # ์ฒ์๋ถํฐ ์ฝ๊ธฐ
37 'enable.auto.commit': auto_commit,
38 'auto.commit.interval.ms': 5000,
39 'session.timeout.ms': 45000,
40 'max.poll.interval.ms': 300000,
41 }
42 self.consumer = Consumer(self.config)
43 self.running = True
44 self.message_count = 0
45 self.error_count = 0
46
47 # Graceful shutdown ์ค์
48 signal.signal(signal.SIGINT, self._signal_handler)
49 signal.signal(signal.SIGTERM, self._signal_handler)
50
51 def _signal_handler(self, signum, frame):
52 """์ข
๋ฃ ์๊ทธ๋ ํธ๋ค๋ฌ"""
53 print("\n[INFO] Shutdown signal received")
54 self.running = False
55
56 def subscribe(self, topics: List[str]):
57 """ํ ํฝ ๊ตฌ๋
"""
58 def on_assign(consumer, partitions):
59 print(f'[INFO] Partitions assigned: {[p.partition for p in partitions]}')
60
61 def on_revoke(consumer, partitions):
62 print(f'[INFO] Partitions revoked: {[p.partition for p in partitions]}')
63
64 self.consumer.subscribe(topics, on_assign=on_assign, on_revoke=on_revoke)
65 print(f'[INFO] Subscribed to topics: {topics}')
66
67 def consume(self, handler: Callable[[dict, dict], None], timeout: float = 1.0):
68 """๋ฉ์์ง ์๋น ๋ฃจํ"""
69 print('[INFO] Starting consumer loop...')
70
71 while self.running:
72 try:
73 msg = self.consumer.poll(timeout=timeout)
74
75 if msg is None:
76 continue
77
78 if msg.error():
79 if msg.error().code() == KafkaError._PARTITION_EOF:
80 # ํํฐ์
๋์ ๋๋ฌ
81 continue
82 else:
83 print(f'[ERROR] Consumer error: {msg.error()}')
84 self.error_count += 1
85 continue
86
87 # ๋ฉ์์ง ํ์ฑ
88 try:
89 key = msg.key().decode('utf-8') if msg.key() else None
90 value = json.loads(msg.value().decode('utf-8'))
91
92 metadata = {
93 'topic': msg.topic(),
94 'partition': msg.partition(),
95 'offset': msg.offset(),
96 'timestamp': msg.timestamp(),
97 'key': key,
98 }
99
100 # ํธ๋ค๋ฌ ํธ์ถ
101 handler(value, metadata)
102 self.message_count += 1
103
104 except json.JSONDecodeError as e:
105 print(f'[ERROR] Failed to parse message: {e}')
106 self.error_count += 1
107
108 except KafkaException as e:
109 print(f'[ERROR] Kafka exception: {e}')
110 self.error_count += 1
111
112 self.close()
113
114 def commit(self):
115 """์๋ ์ปค๋ฐ"""
116 self.consumer.commit()
117
118 def close(self):
119 """Consumer ์ข
๋ฃ"""
120 print('\n[INFO] Closing consumer...')
121 self.consumer.close()
122 print(f'[SUMMARY] Total consumed: {self.message_count}, Errors: {self.error_count}')
123
124
125class MessageAggregator:
126 """๋ฉ์์ง ์ง๊ณ ํด๋์ค"""
127
128 def __init__(self, window_seconds: int = 60):
129 self.window_seconds = window_seconds
130 self.counts = defaultdict(int)
131 self.amounts = defaultdict(float)
132 self.last_report = datetime.now()
133
134 def add(self, event_type: str, amount: float = 0):
135 """์ด๋ฒคํธ ์ถ๊ฐ"""
136 self.counts[event_type] += 1
137 self.amounts[event_type] += amount
138
139 def should_report(self) -> bool:
140 """๋ณด๊ณ ์๊ฐ ํ์ธ"""
141 elapsed = (datetime.now() - self.last_report).seconds
142 return elapsed >= self.window_seconds
143
144 def report_and_reset(self) -> dict:
145 """๋ณด๊ณ ๋ฐ ๋ฆฌ์
"""
146 report = {
147 'window_end': datetime.now().isoformat(),
148 'counts': dict(self.counts),
149 'amounts': dict(self.amounts),
150 }
151 self.counts.clear()
152 self.amounts.clear()
153 self.last_report = datetime.now()
154 return report
155
156
157def demo_simple_consumer():
158 """๊ฐ๋จํ Consumer ๋ฐ๋ชจ"""
159 print("=" * 60)
160 print("Simple Consumer Demo")
161 print("=" * 60)
162
163 consumer = KafkaConsumerExample(group_id='simple-consumer-group')
164 consumer.subscribe(['demo-topic'])
165
166 def message_handler(value: dict, metadata: dict):
167 print(f"Received: topic={metadata['topic']}, "
168 f"partition={metadata['partition']}, "
169 f"offset={metadata['offset']}")
170 print(f" Key: {metadata['key']}")
171 print(f" Value: {value}")
172 print()
173
174 consumer.consume(message_handler)
175
176
177def demo_order_consumer():
178 """์ฃผ๋ฌธ ์ด๋ฒคํธ Consumer ๋ฐ๋ชจ"""
179 print("\n" + "=" * 60)
180 print("Order Consumer Demo")
181 print("=" * 60)
182
183 consumer = KafkaConsumerExample(group_id='order-consumer-group')
184 consumer.subscribe(['orders'])
185
186 aggregator = MessageAggregator(window_seconds=10)
187
188 def order_handler(value: dict, metadata: dict):
189 # ์ด๋ฒคํธ ์ง๊ณ
190 aggregator.add(value.get('status', 'unknown'), value.get('amount', 0))
191
192 # ์ฃผ๊ธฐ์ ๋ณด๊ณ
193 if aggregator.should_report():
194 report = aggregator.report_and_reset()
195 print(f"\n[REPORT] {report['window_end']}")
196 print(f" Order counts: {report['counts']}")
197 print(f" Total amounts: {report['amounts']}")
198
199 # ํน์ ์กฐ๊ฑด ์ฒ๋ฆฌ (์: ๊ณ ์ก ์ฃผ๋ฌธ)
200 if value.get('amount', 0) > 500:
201 print(f"[ALERT] High-value order: {value['order_id']} - ${value['amount']}")
202
203 print("Processing orders (Press Ctrl+C to stop)...")
204 consumer.consume(order_handler)
205
206
207def demo_multi_topic_consumer():
208 """๋ฉํฐ ํ ํฝ Consumer ๋ฐ๋ชจ"""
209 print("\n" + "=" * 60)
210 print("Multi-Topic Consumer Demo")
211 print("=" * 60)
212
213 consumer = KafkaConsumerExample(group_id='multi-topic-group')
214 consumer.subscribe(['orders', 'clickstream', 'inventory'])
215
216 topic_counts = defaultdict(int)
217
218 def multi_handler(value: dict, metadata: dict):
219 topic = metadata['topic']
220 topic_counts[topic] += 1
221
222 # ํ ํฝ๋ณ ์ฒ๋ฆฌ
223 if topic == 'orders':
224 print(f"[ORDER] {value.get('order_id')}: ${value.get('amount')}")
225 elif topic == 'clickstream':
226 if topic_counts[topic] % 10 == 0: # 10๊ฐ๋ง๋ค ์ถ๋ ฅ
227 print(f"[CLICK] Processed {topic_counts[topic]} events")
228 elif topic == 'inventory':
229 if value.get('current_stock', 100) < 10:
230 print(f"[LOW STOCK] {value.get('product_sku')}: {value.get('current_stock')} units")
231
232 print("Processing multiple topics (Press Ctrl+C to stop)...")
233 consumer.consume(multi_handler)
234
235
236def demo_manual_commit_consumer():
237 """์๋ ์ปค๋ฐ Consumer ๋ฐ๋ชจ"""
238 print("\n" + "=" * 60)
239 print("Manual Commit Consumer Demo")
240 print("=" * 60)
241
242 consumer = KafkaConsumerExample(
243 group_id='manual-commit-group',
244 auto_commit=False # ์๋ ์ปค๋ฐ ๋นํ์ฑํ
245 )
246 consumer.subscribe(['batch-orders'])
247
248 batch = []
249 batch_size = 10
250
251 def batch_handler(value: dict, metadata: dict):
252 batch.append(value)
253
254 # ๋ฐฐ์น ์ฒ๋ฆฌ
255 if len(batch) >= batch_size:
256 print(f"\n[BATCH] Processing {len(batch)} messages...")
257
258 # ๋ฐฐ์น ์ฒ๋ฆฌ ๋ก์ง
259 total_amount = sum(msg.get('amount', 0) for msg in batch)
260 print(f" Total amount: ${total_amount:.2f}")
261
262 # ์ฒ๋ฆฌ ์ฑ๊ณต ํ ์ปค๋ฐ
263 consumer.commit()
264 print(f" Committed offset")
265
266 batch.clear()
267
268 print("Processing in batches with manual commit (Press Ctrl+C to stop)...")
269 consumer.consume(batch_handler)
270
271
272def demo_stateful_consumer():
273 """์ํ ์ ์ง Consumer ๋ฐ๋ชจ"""
274 print("\n" + "=" * 60)
275 print("Stateful Consumer Demo")
276 print("=" * 60)
277
278 consumer = KafkaConsumerExample(group_id='stateful-group')
279 consumer.subscribe(['orders'])
280
281 # ๊ณ ๊ฐ๋ณ ์ํ ์ ์ง
282 customer_state = defaultdict(lambda: {
283 'order_count': 0,
284 'total_spent': 0.0,
285 'last_order': None
286 })
287
288 def stateful_handler(value: dict, metadata: dict):
289 customer_id = value.get('customer_id')
290 if not customer_id:
291 return
292
293 state = customer_state[customer_id]
294 state['order_count'] += 1
295 state['total_spent'] += value.get('amount', 0)
296 state['last_order'] = value.get('timestamp')
297
298 # VIP ๊ณ ๊ฐ ๊ฐ์ง
299 if state['total_spent'] > 2000:
300 print(f"[VIP] Customer {customer_id}: "
301 f"{state['order_count']} orders, ${state['total_spent']:.2f} total")
302
303 print("Tracking customer state (Press Ctrl+C to stop)...")
304 consumer.consume(stateful_handler)
305
306
307def main():
308 print("Kafka Consumer Examples")
309 print("=" * 60)
310 print("Make sure Kafka is running and has messages")
311 print("Run producer.py first to generate messages")
312 print()
313
314 demos = {
315 '1': ('Simple Consumer', demo_simple_consumer),
316 '2': ('Order Consumer', demo_order_consumer),
317 '3': ('Multi-Topic Consumer', demo_multi_topic_consumer),
318 '4': ('Manual Commit Consumer', demo_manual_commit_consumer),
319 '5': ('Stateful Consumer', demo_stateful_consumer),
320 }
321
322 print("Available demos:")
323 for key, (name, _) in demos.items():
324 print(f" {key}. {name}")
325
326 choice = input("\nSelect demo (1-5): ").strip()
327
328 if choice in demos:
329 demos[choice][1]()
330 else:
331 print("Invalid choice, running simple demo...")
332 demo_simple_consumer()
333
334
335if __name__ == "__main__":
336 main()