Kafka μ€νΈλ¦¬λ°
Kafka μ€νΈλ¦¬λ°¶
κ°μ¶
Apache Kafkaλ λΆμ° μ΄λ²€νΈ μ€νΈλ¦¬λ° νλ«νΌμΌλ‘, μ€μκ° λ°μ΄ν° νμ΄νλΌμΈκ³Ό μ€νΈλ¦¬λ° μ ν리μΌμ΄μ ꡬμΆμ μ¬μ©λ©λλ€. λμ μ²λ¦¬λκ³Ό λ΄κ²°ν¨μ±μ μ 곡ν©λλ€.
1. Kafka κ°μ¶
1.1 Kafka μν€ν μ²¶
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Kafka Architecture β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Producers Consumers β
β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β
β βProducer1β βProducer2β βConsumer1β βConsumer2β β
β ββββββ¬βββββ ββββββ¬βββββ ββββββ¬βββββ ββββββ¬βββββ β
β β β β β β
β βββββββ¬ββββββ βββββββ¬ββββββ β
β β β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Kafka Cluster β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β Topic: orders ββ β
β β β ββββββββββββββ ββββββββββββββ ββββββββββββββ ββ β
β β β βPartition 0 β βPartition 1 β βPartition 2 β ββ β
β β β β [0,1,2,3] β β [0,1,2] β β [0,1,2,3,4]β ββ β
β β β ββββββββββββββ ββββββββββββββ ββββββββββββββ ββ β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β Broker 1 Broker 2 Broker 3 β β
β β ββββββββββββ ββββββββββββ ββββββββββββ β β
β β β P0(L) β β P1(L) β β P2(L) β β β
β β β P1(R) β β P2(R) β β P0(R) β β β
β β ββββββββββββ ββββββββββββ ββββββββββββ β β
β β L=Leader, R=Replica β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β ZooKeeper / KRaft β β
β β (ν΄λ¬μ€ν° λ©νλ°μ΄ν° κ΄λ¦¬) β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
1.2 ν΅μ¬ κ°λ ¶
| κ°λ | μ€λͺ |
|---|---|
| Broker | Kafka μλ², λ©μμ§ μ μ₯/μ λ¬ |
| Topic | λ©μμ§ μΉ΄ν κ³ λ¦¬ (λ Όλ¦¬μ μ±λ) |
| Partition | Topicμ 물리μ λΆν , λ³λ ¬ μ²λ¦¬ |
| Producer | λ©μμ§ λ°νμ |
| Consumer | λ©μμ§ μλΉμ |
| Consumer Group | νλ ₯νμ¬ μλΉνλ Consumer κ·Έλ£Ή |
| Offset | νν°μ λ΄ λ©μμ§ μμΉ |
| Replication | νν°μ 볡μ λ‘ λ΄κ²°ν¨μ± ν보 |
2. μ€μΉ λ° μ€μ ¶
2.1 Docker Compose μ€μ ¶
# docker-compose.yaml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
# Kafka UI (μ νμ¬ν)
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
# μ€ν
docker-compose up -d
# ν ν½ μμ± (컨ν
μ΄λ λ΄λΆμμ)
docker exec -it kafka kafka-topics --create \
--bootstrap-server localhost:9092 \
--topic my-topic \
--partitions 3 \
--replication-factor 1
2.2 Python ν΄λΌμ΄μΈνΈ μ€μΉ¶
# confluent-kafka (κΆμ₯)
pip install confluent-kafka
# kafka-python (λμ)
pip install kafka-python
3. Topicκ³Ό Partition¶
3.1 Topic κ΄λ¦¬¶
# ν ν½ μμ±
kafka-topics --create \
--bootstrap-server localhost:9092 \
--topic orders \
--partitions 6 \
--replication-factor 3
# ν ν½ λͺ©λ‘
kafka-topics --list --bootstrap-server localhost:9092
# ν ν½ μμΈ μ 보
kafka-topics --describe \
--bootstrap-server localhost:9092 \
--topic orders
# ν ν½ μμ
kafka-topics --delete \
--bootstrap-server localhost:9092 \
--topic orders
# νν°μ
μ μ¦κ° (μΆμ λΆκ°)
kafka-topics --alter \
--bootstrap-server localhost:9092 \
--topic orders \
--partitions 12
3.2 Partition μ λ΅¶
"""
νν°μ
μ ν μ λ΅:
1. Keyκ° μμΌλ©΄: hash(key) % partitions
2. Keyκ° μμΌλ©΄: Round-robin
νν°μ
μ κ²°μ μμ:
- μμ μ²λ¦¬λ / λ¨μΌ νν°μ
μ²λ¦¬λ
- Consumer μ (νν°μ
>= Consumer)
- λμ€ν¬ I/O κ³ λ €
"""
# νν°μ
μ κΆμ₯
"""
- νν°μ
λΉ 100MB/s μ²λ¦¬ κ°μ
- 1GB/s μ²λ¦¬ νμ β μ΅μ 10κ° νν°μ
- Consumer νμ₯μ± κ³ λ € β μμ Consumer μμ 2-3λ°°
μ£Όμ:
- λ무 λ§μ νν°μ
β 리λ μ μΆ μ§μ°, λ©λͺ¨λ¦¬ μ¬μ© μ¦κ°
- λ무 μ μ νν°μ
β λ³λ ¬μ± μ ν
"""
4. Producer¶
4.1 κΈ°λ³Έ Producer¶
from confluent_kafka import Producer
import json
# Producer μ€μ
config = {
'bootstrap.servers': 'localhost:9092',
'client.id': 'my-producer',
'acks': 'all', # λͺ¨λ replica νμΈ
}
producer = Producer(config)
# λ°°λ¬ νμΈ μ½λ°±
def delivery_callback(err, msg):
if err:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')
# λ©μμ§ μ μ‘
def send_message(topic: str, key: str, value: dict):
producer.produce(
topic=topic,
key=key.encode('utf-8'),
value=json.dumps(value).encode('utf-8'),
callback=delivery_callback
)
# λ²νΌ νλ¬μ (λΉλκΈ° μ μ‘ μλ£ λκΈ°)
producer.flush()
# μ¬μ© μμ
send_message(
topic='orders',
key='order-123',
value={
'order_id': 'order-123',
'customer_id': 'cust-456',
'amount': 99.99,
'timestamp': '2024-01-15T10:30:00Z'
}
)
4.2 κ³ μ±λ₯ Producer¶
from confluent_kafka import Producer
import json
import time
class HighThroughputProducer:
"""κ³ μ²λ¦¬λ Producer"""
def __init__(self, bootstrap_servers: str):
self.config = {
'bootstrap.servers': bootstrap_servers,
'client.id': 'high-throughput-producer',
# μ±λ₯ μ€μ
'acks': '1', # 리λλ§ νμΈ (λΉ λ¦)
'linger.ms': 5, # λ°°μΉ λκΈ° μκ°
'batch.size': 16384, # λ°°μΉ ν¬κΈ° (16KB)
'buffer.memory': 33554432, # λ²νΌ λ©λͺ¨λ¦¬ (32MB)
'compression.type': 'snappy', # μμΆ
# μ¬μλ μ€μ
'retries': 3,
'retry.backoff.ms': 100,
}
self.producer = Producer(self.config)
self.message_count = 0
def send(self, topic: str, key: str, value: dict):
"""λΉλκΈ° μ μ‘"""
self.producer.produce(
topic=topic,
key=key.encode('utf-8') if key else None,
value=json.dumps(value).encode('utf-8'),
callback=self._on_delivery
)
self.message_count += 1
# μ£ΌκΈ°μ ν΄λ§ (μ΄λ²€νΈ μ²λ¦¬)
if self.message_count % 1000 == 0:
self.producer.poll(0)
def _on_delivery(self, err, msg):
if err:
print(f'Delivery failed: {err}')
def flush(self):
"""λͺ¨λ λ©μμ§ μ μ‘ μλ£ λκΈ°"""
self.producer.flush()
def close(self):
self.flush()
# λλ μ μ‘ μμ
producer = HighThroughputProducer('localhost:9092')
start = time.time()
for i in range(100000):
producer.send(
topic='events',
key=f'key-{i % 100}',
value={'event_id': i, 'data': 'test'}
)
producer.flush()
print(f'Sent 100,000 messages in {time.time() - start:.2f} seconds')
5. Consumer¶
5.1 κΈ°λ³Έ Consumer¶
from confluent_kafka import Consumer
import json
# Consumer μ€μ
config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-consumer-group',
'auto.offset.reset': 'earliest', # μ²μλΆν° μ½κΈ°
'enable.auto.commit': True,
'auto.commit.interval.ms': 5000,
}
consumer = Consumer(config)
# ν ν½ κ΅¬λ
consumer.subscribe(['orders'])
# λ©μμ§ μλΉ
try:
while True:
msg = consumer.poll(timeout=1.0) # 1μ΄ λκΈ°
if msg is None:
continue
if msg.error():
print(f'Consumer error: {msg.error()}')
continue
# λ©μμ§ μ²λ¦¬
key = msg.key().decode('utf-8') if msg.key() else None
value = json.loads(msg.value().decode('utf-8'))
print(f'Received: topic={msg.topic()}, partition={msg.partition()}, '
f'offset={msg.offset()}, key={key}, value={value}')
except KeyboardInterrupt:
pass
finally:
consumer.close()
5.2 μλ 컀밶
from confluent_kafka import Consumer
import json
config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'manual-commit-group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False, # μλ μ»€λ° λΉνμ±ν
}
consumer = Consumer(config)
consumer.subscribe(['orders'])
def process_message(value: dict) -> bool:
"""λ©μμ§ μ²λ¦¬ λ‘μ§"""
try:
# μ€μ λΉμ¦λμ€ λ‘μ§
print(f"Processing: {value}")
return True
except Exception as e:
print(f"Processing failed: {e}")
return False
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
continue
value = json.loads(msg.value().decode('utf-8'))
# μ²λ¦¬ μ±κ³΅ μμλ§ μ»€λ°
if process_message(value):
consumer.commit(msg) # νΉμ λ©μμ§ μ»€λ°
# λλ consumer.commit() # νμ¬ μ€νμ
κΉμ§ 컀λ°
else:
print("Message processing failed, not committing")
except KeyboardInterrupt:
pass
finally:
consumer.close()
6. Consumer Group¶
6.1 Consumer Group κ°λ ¶
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Consumer Group λμ β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Topic: orders (6 partitions) β
β ββββββββ ββββββββ ββββββββ ββββββββ ββββββββ ββββββββ β
β β P0 β β P1 β β P2 β β P3 β β P4 β β P5 β β
β ββββ¬ββββ ββββ¬ββββ ββββ¬ββββ ββββ¬ββββ ββββ¬ββββ ββββ¬ββββ β
β β β β β β β β
β Consumer Group A (3 consumers) β
β β β β β β β β
β β β β β β β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Consumer 1 β β Consumer 2 β β Consumer 3 β β
β β P0, P1 β β P2, P3 β β P4, P5 β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β
β κ° νν°μ
μ κ·Έλ£Ή λ΄ νλμ Consumerμλ§ ν λΉ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
6.2 리밸λ°μ±¶
from confluent_kafka import Consumer
def on_assign(consumer, partitions):
"""νν°μ
ν λΉ μ½λ°±"""
print(f"Partitions assigned: {[p.partition for p in partitions]}")
def on_revoke(consumer, partitions):
"""νν°μ
ν΄μ μ½λ°±"""
print(f"Partitions revoked: {[p.partition for p in partitions]}")
# μ²λ¦¬ μ€μΈ λ©μμ§ μ»€λ°
consumer.commit()
config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'auto.offset.reset': 'earliest',
'partition.assignment.strategy': 'cooperative-sticky', # μ μ§μ 리밸λ°μ±
}
consumer = Consumer(config)
consumer.subscribe(
['orders'],
on_assign=on_assign,
on_revoke=on_revoke
)
6.3 Consumer Group λͺ¨λν°λ§¶
# Consumer Group λͺ©λ‘
kafka-consumer-groups --list --bootstrap-server localhost:9092
# Consumer Group μμΈ
kafka-consumer-groups --describe \
--bootstrap-server localhost:9092 \
--group my-consumer-group
# μΆλ ₯ μμ:
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# my-group orders 0 1500 1550 50
# my-group orders 1 1200 1200 0
# Lag λͺ¨λν°λ§ (μ²λ¦¬ μ§μ°)
kafka-consumer-groups --describe \
--bootstrap-server localhost:9092 \
--group my-consumer-group \
--members
7. μ€μκ° λ°μ΄ν° μ²λ¦¬ ν¨ν΄¶
7.1 μ΄λ²€νΈ κΈ°λ° μ²λ¦¬¶
from confluent_kafka import Consumer, Producer
import json
class EventProcessor:
"""μ΄λ²€νΈ κΈ°λ° μ²λ¦¬ νμ΄νλΌμΈ"""
def __init__(self, bootstrap_servers: str, group_id: str):
self.consumer = Consumer({
'bootstrap.servers': bootstrap_servers,
'group.id': group_id,
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
})
self.producer = Producer({
'bootstrap.servers': bootstrap_servers,
})
def process_and_forward(
self,
source_topic: str,
target_topic: str,
transform_func
):
"""λ©μμ§ μ²λ¦¬ ν λ€λ₯Έ ν ν½μΌλ‘ μ λ¬"""
self.consumer.subscribe([source_topic])
try:
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
continue
# λ³ν
value = json.loads(msg.value().decode('utf-8'))
transformed = transform_func(value)
if transformed:
# λ€μ ν ν½μΌλ‘ μ λ¬
self.producer.produce(
topic=target_topic,
key=msg.key(),
value=json.dumps(transformed).encode('utf-8')
)
self.producer.poll(0)
# 컀λ°
self.consumer.commit(msg)
except KeyboardInterrupt:
pass
finally:
self.producer.flush()
self.consumer.close()
# μ¬μ© μμ: μ£Όλ¬Έ β λ°°μ‘ μ΄λ²€νΈ λ³ν
def order_to_shipment(order: dict) -> dict:
"""μ£Όλ¬Έ μ΄λ²€νΈλ₯Ό λ°°μ‘ μ΄λ²€νΈλ‘ λ³ν"""
return {
'shipment_id': f"ship-{order['order_id']}",
'order_id': order['order_id'],
'customer_id': order['customer_id'],
'status': 'pending',
'created_at': order['timestamp']
}
processor = EventProcessor('localhost:9092', 'order-processor')
processor.process_and_forward('orders', 'shipments', order_to_shipment)
7.2 μ§κ³ μ²λ¦¬ (Windowing)¶
from confluent_kafka import Consumer
from collections import defaultdict
from datetime import datetime, timedelta
import json
import threading
import time
class WindowedAggregator:
"""μκ° μλμ° κΈ°λ° μ§κ³"""
def __init__(self, window_size_seconds: int = 60):
self.window_size = window_size_seconds
self.windows = defaultdict(lambda: defaultdict(int))
self.lock = threading.Lock()
def add(self, key: str, value: int, timestamp: datetime):
"""κ° μΆκ°"""
window_start = self._get_window_start(timestamp)
with self.lock:
self.windows[window_start][key] += value
def _get_window_start(self, timestamp: datetime) -> datetime:
"""μλμ° μμ μκ° κ³μ°"""
seconds = int(timestamp.timestamp())
window_start_seconds = (seconds // self.window_size) * self.window_size
return datetime.fromtimestamp(window_start_seconds)
def get_and_clear_completed_windows(self) -> dict:
"""μλ£λ μλμ° κ²°κ³Ό λ°ν"""
current_window = self._get_window_start(datetime.now())
completed = {}
with self.lock:
for window_start, data in list(self.windows.items()):
if window_start < current_window:
completed[window_start] = dict(data)
del self.windows[window_start]
return completed
# μ¬μ© μμ: λΆλΉ μΉ΄ν
κ³ λ¦¬λ³ νλ§€ μ μ§κ³
aggregator = WindowedAggregator(window_size_seconds=60)
def process_sales():
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'sales-aggregator',
'auto.offset.reset': 'earliest',
})
consumer.subscribe(['sales'])
while True:
msg = consumer.poll(timeout=1.0)
if msg and not msg.error():
value = json.loads(msg.value().decode('utf-8'))
aggregator.add(
key=value['category'],
value=1,
timestamp=datetime.fromisoformat(value['timestamp'])
)
# μλ£λ μλμ° μΆλ ₯
completed = aggregator.get_and_clear_completed_windows()
for window, data in completed.items():
print(f"Window {window}: {data}")
8. Kafka Streamsμ λμ¶
8.1 Faust (Python Kafka Streams)¶
import faust
# Faust μ± μμ±
app = faust.App(
'myapp',
broker='kafka://localhost:9092',
value_serializer='json',
)
# ν ν½ μ μ
orders_topic = app.topic('orders', value_type=dict)
processed_topic = app.topic('processed_orders', value_type=dict)
# μ€νΈλ¦Ό μ²λ¦¬ μμ΄μ νΈ
@app.agent(orders_topic)
async def process_orders(orders):
async for order in orders:
# μ²λ¦¬ λ‘μ§
processed = {
**order,
'processed': True,
'processed_at': str(datetime.now())
}
# λ€λ₯Έ ν ν½μΌλ‘ μ μ‘
await processed_topic.send(value=processed)
# ν
μ΄λΈ (μν μ μ₯)
order_counts = app.Table('order_counts', default=int)
@app.agent(orders_topic)
async def count_orders(orders):
async for order in orders:
customer_id = order['customer_id']
order_counts[customer_id] += 1
# μ€ν: faust -A myapp worker
μ°μ΅ λ¬Έμ ¶
λ¬Έμ 1: Producer/Consumer¶
μ£Όλ¬Έ μ΄λ²€νΈλ₯Ό μμ±νλ Producerμ μλΉνλ Consumerλ₯Ό μμ±νμΈμ.
λ¬Έμ 2: Consumer Group¶
3κ°μ Consumerλ‘ κ΅¬μ±λ Consumer Groupμ λ§λ€κ³ νν°μ ν λΉμ νμΈνμΈμ.
λ¬Έμ 3: μ€μκ° μ§κ³¶
μ€μκ° νλ§€ μ΄λ²€νΈμμ λΆλΉ μ΄ λ§€μΆμ κ³μ°νλ μ€νΈλ¦¬λ° μ ν리μΌμ΄μ μ μμ±νμΈμ.
μμ½¶
| κ°λ | μ€λͺ |
|---|---|
| Topic | λ©μμ§μ λ Όλ¦¬μ μΉ΄ν κ³ λ¦¬ |
| Partition | Topicμ 물리μ λΆν , λ³λ ¬ μ²λ¦¬ λ¨μ |
| Producer | λ©μμ§ λ°νμ |
| Consumer | λ©μμ§ μλΉμ |
| Consumer Group | νλ ₯μ μΌλ‘ μλΉνλ Consumer μ§ν© |
| Offset | νν°μ λ΄ λ©μμ§ μμΉ |