producer.py

Download
python 280 lines 8.3 KB
  1"""
  2Kafka Producer 예제
  3
  4Kafka 토픽에 메시지를 발행하는 Producer 예제입니다.
  5
  6필수 패키지: pip install confluent-kafka
  7
  8실행 전 Kafka 실행 필요:
  9  docker run -d --name kafka -p 9092:9092 \
 10    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
 11    confluentinc/cp-kafka:latest
 12
 13실행: python producer.py
 14"""
 15
 16from confluent_kafka import Producer
 17from confluent_kafka.admin import AdminClient, NewTopic
 18import json
 19import time
 20import random
 21from datetime import datetime
 22from typing import Optional
 23
 24
 25class KafkaProducerExample:
 26    """Kafka Producer 예제 클래스"""
 27
 28    def __init__(self, bootstrap_servers: str = 'localhost:9092'):
 29        self.config = {
 30            'bootstrap.servers': bootstrap_servers,
 31            'client.id': 'example-producer',
 32            'acks': 'all',  # 모든 replica 확인
 33            'retries': 3,
 34            'retry.backoff.ms': 100,
 35            'linger.ms': 5,  # 배치 대기 시간
 36            'batch.size': 16384,  # 배치 크기
 37        }
 38        self.producer = Producer(self.config)
 39        self.message_count = 0
 40        self.error_count = 0
 41
 42    def delivery_callback(self, err, msg):
 43        """메시지 전송 결과 콜백"""
 44        if err:
 45            print(f'[ERROR] Message delivery failed: {err}')
 46            self.error_count += 1
 47        else:
 48            self.message_count += 1
 49            if self.message_count % 100 == 0:
 50                print(f'[INFO] Delivered {self.message_count} messages')
 51
 52    def create_topic(self, topic_name: str, num_partitions: int = 3, replication_factor: int = 1):
 53        """토픽 생성"""
 54        admin_client = AdminClient({'bootstrap.servers': self.config['bootstrap.servers']})
 55
 56        topic = NewTopic(
 57            topic_name,
 58            num_partitions=num_partitions,
 59            replication_factor=replication_factor
 60        )
 61
 62        try:
 63            futures = admin_client.create_topics([topic])
 64            futures[topic_name].result()  # 생성 완료 대기
 65            print(f'[INFO] Topic "{topic_name}" created')
 66        except Exception as e:
 67            if 'already exists' in str(e):
 68                print(f'[INFO] Topic "{topic_name}" already exists')
 69            else:
 70                raise e
 71
 72    def send_message(self, topic: str, key: Optional[str], value: dict):
 73        """단일 메시지 전송"""
 74        try:
 75            self.producer.produce(
 76                topic=topic,
 77                key=key.encode('utf-8') if key else None,
 78                value=json.dumps(value).encode('utf-8'),
 79                callback=self.delivery_callback
 80            )
 81            # 주기적으로 이벤트 처리
 82            self.producer.poll(0)
 83        except BufferError:
 84            # 버퍼가 꽉 찬 경우 대기
 85            print('[WARN] Buffer full, waiting...')
 86            self.producer.flush()
 87            self.send_message(topic, key, value)
 88
 89    def flush(self):
 90        """모든 메시지 전송 완료 대기"""
 91        self.producer.flush()
 92
 93    def close(self):
 94        """Producer 종료"""
 95        self.flush()
 96        print(f'\n[SUMMARY] Total sent: {self.message_count}, Errors: {self.error_count}')
 97
 98
 99def generate_order_event() -> dict:
100    """주문 이벤트 생성"""
101    products = ['laptop', 'phone', 'tablet', 'headphones', 'keyboard', 'mouse']
102    statuses = ['created', 'confirmed', 'shipped', 'delivered']
103
104    return {
105        'event_type': 'order',
106        'order_id': f'ORD-{random.randint(10000, 99999)}',
107        'customer_id': f'CUST-{random.randint(1, 1000)}',
108        'product': random.choice(products),
109        'quantity': random.randint(1, 5),
110        'amount': round(random.uniform(10, 1000), 2),
111        'status': random.choice(statuses),
112        'timestamp': datetime.now().isoformat(),
113    }
114
115
116def generate_clickstream_event() -> dict:
117    """클릭스트림 이벤트 생성"""
118    pages = ['/home', '/products', '/cart', '/checkout', '/profile', '/search']
119    actions = ['view', 'click', 'scroll', 'hover']
120
121    return {
122        'event_type': 'clickstream',
123        'event_id': f'EVT-{random.randint(100000, 999999)}',
124        'user_id': f'USER-{random.randint(1, 500)}',
125        'session_id': f'SESS-{random.randint(1000, 9999)}',
126        'page': random.choice(pages),
127        'action': random.choice(actions),
128        'timestamp': datetime.now().isoformat(),
129    }
130
131
132def generate_inventory_event() -> dict:
133    """재고 이벤트 생성"""
134    products = ['SKU-001', 'SKU-002', 'SKU-003', 'SKU-004', 'SKU-005']
135    warehouses = ['WH-EAST', 'WH-WEST', 'WH-CENTRAL']
136
137    return {
138        'event_type': 'inventory',
139        'product_sku': random.choice(products),
140        'warehouse': random.choice(warehouses),
141        'quantity_change': random.randint(-50, 100),
142        'current_stock': random.randint(0, 500),
143        'timestamp': datetime.now().isoformat(),
144    }
145
146
147def demo_simple_producer():
148    """간단한 Producer 데모"""
149    print("=" * 60)
150    print("Simple Producer Demo")
151    print("=" * 60)
152
153    producer = KafkaProducerExample()
154
155    # 토픽 생성
156    producer.create_topic('demo-topic')
157
158    # 메시지 전송
159    for i in range(10):
160        message = {
161            'id': i,
162            'message': f'Hello Kafka #{i}',
163            'timestamp': datetime.now().isoformat()
164        }
165        producer.send_message(
166            topic='demo-topic',
167            key=f'key-{i % 3}',  # 3개 파티션에 분배
168            value=message
169        )
170        print(f'Sent: {message}')
171
172    producer.close()
173
174
175def demo_event_stream():
176    """이벤트 스트림 데모"""
177    print("\n" + "=" * 60)
178    print("Event Stream Demo")
179    print("=" * 60)
180
181    producer = KafkaProducerExample()
182
183    # 토픽 생성
184    producer.create_topic('orders', num_partitions=3)
185    producer.create_topic('clickstream', num_partitions=6)
186    producer.create_topic('inventory', num_partitions=3)
187
188    print("\nStreaming events (Press Ctrl+C to stop)...")
189
190    try:
191        event_count = 0
192        while True:
193            # 주문 이벤트 (낮은 빈도)
194            if random.random() < 0.3:
195                event = generate_order_event()
196                producer.send_message('orders', event['customer_id'], event)
197
198            # 클릭스트림 이벤트 (높은 빈도)
199            for _ in range(random.randint(1, 5)):
200                event = generate_clickstream_event()
201                producer.send_message('clickstream', event['user_id'], event)
202
203            # 재고 이벤트 (중간 빈도)
204            if random.random() < 0.5:
205                event = generate_inventory_event()
206                producer.send_message('inventory', event['product_sku'], event)
207
208            event_count += 1
209            if event_count % 50 == 0:
210                print(f'Generated {event_count} event batches')
211
212            time.sleep(0.1)  # 100ms 간격
213
214    except KeyboardInterrupt:
215        print("\nStopping...")
216
217    producer.close()
218
219
220def demo_batch_producer():
221    """배치 Producer 데모"""
222    print("\n" + "=" * 60)
223    print("Batch Producer Demo")
224    print("=" * 60)
225
226    producer = KafkaProducerExample()
227    producer.create_topic('batch-orders')
228
229    batch_size = 1000
230    print(f"\nSending {batch_size} messages in batch...")
231
232    start_time = time.time()
233
234    for i in range(batch_size):
235        event = generate_order_event()
236        producer.send_message('batch-orders', event['order_id'], event)
237
238    producer.flush()
239    elapsed = time.time() - start_time
240
241    print(f"\nBatch completed:")
242    print(f"  Messages: {batch_size}")
243    print(f"  Time: {elapsed:.2f} seconds")
244    print(f"  Throughput: {batch_size / elapsed:.0f} messages/second")
245
246    producer.close()
247
248
249def main():
250    print("Kafka Producer Examples")
251    print("=" * 60)
252    print("Make sure Kafka is running on localhost:9092")
253    print()
254
255    # 데모 선택
256    demos = {
257        '1': ('Simple Producer', demo_simple_producer),
258        '2': ('Event Stream', demo_event_stream),
259        '3': ('Batch Producer', demo_batch_producer),
260    }
261
262    print("Available demos:")
263    for key, (name, _) in demos.items():
264        print(f"  {key}. {name}")
265
266    choice = input("\nSelect demo (1-3) or 'all': ").strip()
267
268    if choice == 'all':
269        for name, func in demos.values():
270            func()
271    elif choice in demos:
272        demos[choice][1]()
273    else:
274        print("Invalid choice, running simple demo...")
275        demo_simple_producer()
276
277
278if __name__ == "__main__":
279    main()