1"""
2Kafka Producer 예제
3
4Kafka 토픽에 메시지를 발행하는 Producer 예제입니다.
5
6필수 패키지: pip install confluent-kafka
7
8실행 전 Kafka 실행 필요:
9 docker run -d --name kafka -p 9092:9092 \
10 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
11 confluentinc/cp-kafka:latest
12
13실행: python producer.py
14"""
15
16from confluent_kafka import Producer
17from confluent_kafka.admin import AdminClient, NewTopic
18import json
19import time
20import random
21from datetime import datetime
22from typing import Optional
23
24
25class KafkaProducerExample:
26 """Kafka Producer 예제 클래스"""
27
28 def __init__(self, bootstrap_servers: str = 'localhost:9092'):
29 self.config = {
30 'bootstrap.servers': bootstrap_servers,
31 'client.id': 'example-producer',
32 'acks': 'all', # 모든 replica 확인
33 'retries': 3,
34 'retry.backoff.ms': 100,
35 'linger.ms': 5, # 배치 대기 시간
36 'batch.size': 16384, # 배치 크기
37 }
38 self.producer = Producer(self.config)
39 self.message_count = 0
40 self.error_count = 0
41
42 def delivery_callback(self, err, msg):
43 """메시지 전송 결과 콜백"""
44 if err:
45 print(f'[ERROR] Message delivery failed: {err}')
46 self.error_count += 1
47 else:
48 self.message_count += 1
49 if self.message_count % 100 == 0:
50 print(f'[INFO] Delivered {self.message_count} messages')
51
52 def create_topic(self, topic_name: str, num_partitions: int = 3, replication_factor: int = 1):
53 """토픽 생성"""
54 admin_client = AdminClient({'bootstrap.servers': self.config['bootstrap.servers']})
55
56 topic = NewTopic(
57 topic_name,
58 num_partitions=num_partitions,
59 replication_factor=replication_factor
60 )
61
62 try:
63 futures = admin_client.create_topics([topic])
64 futures[topic_name].result() # 생성 완료 대기
65 print(f'[INFO] Topic "{topic_name}" created')
66 except Exception as e:
67 if 'already exists' in str(e):
68 print(f'[INFO] Topic "{topic_name}" already exists')
69 else:
70 raise e
71
72 def send_message(self, topic: str, key: Optional[str], value: dict):
73 """단일 메시지 전송"""
74 try:
75 self.producer.produce(
76 topic=topic,
77 key=key.encode('utf-8') if key else None,
78 value=json.dumps(value).encode('utf-8'),
79 callback=self.delivery_callback
80 )
81 # 주기적으로 이벤트 처리
82 self.producer.poll(0)
83 except BufferError:
84 # 버퍼가 꽉 찬 경우 대기
85 print('[WARN] Buffer full, waiting...')
86 self.producer.flush()
87 self.send_message(topic, key, value)
88
89 def flush(self):
90 """모든 메시지 전송 완료 대기"""
91 self.producer.flush()
92
93 def close(self):
94 """Producer 종료"""
95 self.flush()
96 print(f'\n[SUMMARY] Total sent: {self.message_count}, Errors: {self.error_count}')
97
98
99def generate_order_event() -> dict:
100 """주문 이벤트 생성"""
101 products = ['laptop', 'phone', 'tablet', 'headphones', 'keyboard', 'mouse']
102 statuses = ['created', 'confirmed', 'shipped', 'delivered']
103
104 return {
105 'event_type': 'order',
106 'order_id': f'ORD-{random.randint(10000, 99999)}',
107 'customer_id': f'CUST-{random.randint(1, 1000)}',
108 'product': random.choice(products),
109 'quantity': random.randint(1, 5),
110 'amount': round(random.uniform(10, 1000), 2),
111 'status': random.choice(statuses),
112 'timestamp': datetime.now().isoformat(),
113 }
114
115
116def generate_clickstream_event() -> dict:
117 """클릭스트림 이벤트 생성"""
118 pages = ['/home', '/products', '/cart', '/checkout', '/profile', '/search']
119 actions = ['view', 'click', 'scroll', 'hover']
120
121 return {
122 'event_type': 'clickstream',
123 'event_id': f'EVT-{random.randint(100000, 999999)}',
124 'user_id': f'USER-{random.randint(1, 500)}',
125 'session_id': f'SESS-{random.randint(1000, 9999)}',
126 'page': random.choice(pages),
127 'action': random.choice(actions),
128 'timestamp': datetime.now().isoformat(),
129 }
130
131
132def generate_inventory_event() -> dict:
133 """재고 이벤트 생성"""
134 products = ['SKU-001', 'SKU-002', 'SKU-003', 'SKU-004', 'SKU-005']
135 warehouses = ['WH-EAST', 'WH-WEST', 'WH-CENTRAL']
136
137 return {
138 'event_type': 'inventory',
139 'product_sku': random.choice(products),
140 'warehouse': random.choice(warehouses),
141 'quantity_change': random.randint(-50, 100),
142 'current_stock': random.randint(0, 500),
143 'timestamp': datetime.now().isoformat(),
144 }
145
146
147def demo_simple_producer():
148 """간단한 Producer 데모"""
149 print("=" * 60)
150 print("Simple Producer Demo")
151 print("=" * 60)
152
153 producer = KafkaProducerExample()
154
155 # 토픽 생성
156 producer.create_topic('demo-topic')
157
158 # 메시지 전송
159 for i in range(10):
160 message = {
161 'id': i,
162 'message': f'Hello Kafka #{i}',
163 'timestamp': datetime.now().isoformat()
164 }
165 producer.send_message(
166 topic='demo-topic',
167 key=f'key-{i % 3}', # 3개 파티션에 분배
168 value=message
169 )
170 print(f'Sent: {message}')
171
172 producer.close()
173
174
175def demo_event_stream():
176 """이벤트 스트림 데모"""
177 print("\n" + "=" * 60)
178 print("Event Stream Demo")
179 print("=" * 60)
180
181 producer = KafkaProducerExample()
182
183 # 토픽 생성
184 producer.create_topic('orders', num_partitions=3)
185 producer.create_topic('clickstream', num_partitions=6)
186 producer.create_topic('inventory', num_partitions=3)
187
188 print("\nStreaming events (Press Ctrl+C to stop)...")
189
190 try:
191 event_count = 0
192 while True:
193 # 주문 이벤트 (낮은 빈도)
194 if random.random() < 0.3:
195 event = generate_order_event()
196 producer.send_message('orders', event['customer_id'], event)
197
198 # 클릭스트림 이벤트 (높은 빈도)
199 for _ in range(random.randint(1, 5)):
200 event = generate_clickstream_event()
201 producer.send_message('clickstream', event['user_id'], event)
202
203 # 재고 이벤트 (중간 빈도)
204 if random.random() < 0.5:
205 event = generate_inventory_event()
206 producer.send_message('inventory', event['product_sku'], event)
207
208 event_count += 1
209 if event_count % 50 == 0:
210 print(f'Generated {event_count} event batches')
211
212 time.sleep(0.1) # 100ms 간격
213
214 except KeyboardInterrupt:
215 print("\nStopping...")
216
217 producer.close()
218
219
220def demo_batch_producer():
221 """배치 Producer 데모"""
222 print("\n" + "=" * 60)
223 print("Batch Producer Demo")
224 print("=" * 60)
225
226 producer = KafkaProducerExample()
227 producer.create_topic('batch-orders')
228
229 batch_size = 1000
230 print(f"\nSending {batch_size} messages in batch...")
231
232 start_time = time.time()
233
234 for i in range(batch_size):
235 event = generate_order_event()
236 producer.send_message('batch-orders', event['order_id'], event)
237
238 producer.flush()
239 elapsed = time.time() - start_time
240
241 print(f"\nBatch completed:")
242 print(f" Messages: {batch_size}")
243 print(f" Time: {elapsed:.2f} seconds")
244 print(f" Throughput: {batch_size / elapsed:.0f} messages/second")
245
246 producer.close()
247
248
249def main():
250 print("Kafka Producer Examples")
251 print("=" * 60)
252 print("Make sure Kafka is running on localhost:9092")
253 print()
254
255 # 데모 선택
256 demos = {
257 '1': ('Simple Producer', demo_simple_producer),
258 '2': ('Event Stream', demo_event_stream),
259 '3': ('Batch Producer', demo_batch_producer),
260 }
261
262 print("Available demos:")
263 for key, (name, _) in demos.items():
264 print(f" {key}. {name}")
265
266 choice = input("\nSelect demo (1-3) or 'all': ").strip()
267
268 if choice == 'all':
269 for name, func in demos.values():
270 func()
271 elif choice in demos:
272 demos[choice][1]()
273 else:
274 print("Invalid choice, running simple demo...")
275 demo_simple_producer()
276
277
278if __name__ == "__main__":
279 main()