Kafka 슀트리밍

Kafka 슀트리밍

κ°œμš”

Apache KafkaλŠ” λΆ„μ‚° 이벀트 슀트리밍 ν”Œλž«νΌμœΌλ‘œ, μ‹€μ‹œκ°„ 데이터 νŒŒμ΄ν”„λΌμΈκ³Ό 슀트리밍 μ• ν”Œλ¦¬μΌ€μ΄μ…˜ ꡬ좕에 μ‚¬μš©λ©λ‹ˆλ‹€. 높은 μ²˜λ¦¬λŸ‰κ³Ό 내결함성을 μ œκ³΅ν•©λ‹ˆλ‹€.


1. Kafka κ°œμš”

1.1 Kafka μ•„ν‚€ν…μ²˜

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                      Kafka Architecture                          β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                  β”‚
β”‚   Producers                         Consumers                    β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
β”‚   β”‚Producer1β”‚ β”‚Producer2β”‚          β”‚Consumer1β”‚ β”‚Consumer2β”‚      β”‚
β”‚   β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜          β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜      β”‚
β”‚        β”‚           β”‚                    β”‚           β”‚            β”‚
β”‚        β””β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜                    β””β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜            β”‚
β”‚              ↓                                ↑                  β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚   β”‚                    Kafka Cluster                          β”‚  β”‚
β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚  β”‚
β”‚   β”‚  β”‚                    Topic: orders                      β”‚β”‚  β”‚
β”‚   β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”‚β”‚  β”‚
β”‚   β”‚  β”‚  β”‚Partition 0 β”‚ β”‚Partition 1 β”‚ β”‚Partition 2 β”‚       β”‚β”‚  β”‚
β”‚   β”‚  β”‚  β”‚ [0,1,2,3]  β”‚ β”‚ [0,1,2]    β”‚ β”‚ [0,1,2,3,4]β”‚       β”‚β”‚  β”‚
β”‚   β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β”‚β”‚  β”‚
β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚  β”‚
β”‚   β”‚                                                          β”‚  β”‚
β”‚   β”‚  Broker 1         Broker 2         Broker 3              β”‚  β”‚
β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”           β”‚  β”‚
β”‚   β”‚  β”‚ P0(L)    β”‚    β”‚ P1(L)    β”‚    β”‚ P2(L)    β”‚           β”‚  β”‚
β”‚   β”‚  β”‚ P1(R)    β”‚    β”‚ P2(R)    β”‚    β”‚ P0(R)    β”‚           β”‚  β”‚
β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜           β”‚  β”‚
β”‚   β”‚                   L=Leader, R=Replica                    β”‚  β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚                              ↑                                   β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚   β”‚                    ZooKeeper / KRaft                      β”‚  β”‚
β”‚   β”‚             (ν΄λŸ¬μŠ€ν„° 메타데이터 관리)                      β”‚  β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚                                                                  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

1.2 핡심 κ°œλ…

κ°œλ… μ„€λͺ…
Broker Kafka μ„œλ²„, λ©”μ‹œμ§€ μ €μž₯/전달
Topic λ©”μ‹œμ§€ μΉ΄ν…Œκ³ λ¦¬ (논리적 채널)
Partition Topic의 물리적 λΆ„ν• , 병렬 처리
Producer λ©”μ‹œμ§€ λ°œν–‰μž
Consumer λ©”μ‹œμ§€ μ†ŒλΉ„μž
Consumer Group ν˜‘λ ₯ν•˜μ—¬ μ†ŒλΉ„ν•˜λŠ” Consumer κ·Έλ£Ή
Offset νŒŒν‹°μ…˜ λ‚΄ λ©”μ‹œμ§€ μœ„μΉ˜
Replication νŒŒν‹°μ…˜ 볡제둜 내결함성 확보

2. μ„€μΉ˜ 및 μ„€μ •

2.1 Docker Compose μ„€μ •

# docker-compose.yaml
version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

  # Kafka UI (선택사항)
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
# μ‹€ν–‰
docker-compose up -d

# ν† ν”½ 생성 (μ»¨ν…Œμ΄λ„ˆ λ‚΄λΆ€μ—μ„œ)
docker exec -it kafka kafka-topics --create \
    --bootstrap-server localhost:9092 \
    --topic my-topic \
    --partitions 3 \
    --replication-factor 1

2.2 Python ν΄λΌμ΄μ–ΈνŠΈ μ„€μΉ˜

# confluent-kafka (ꢌμž₯)
pip install confluent-kafka

# kafka-python (λŒ€μ•ˆ)
pip install kafka-python

3. Topicκ³Ό Partition

3.1 Topic 관리

# ν† ν”½ 생성
kafka-topics --create \
    --bootstrap-server localhost:9092 \
    --topic orders \
    --partitions 6 \
    --replication-factor 3

# ν† ν”½ λͺ©λ‘
kafka-topics --list --bootstrap-server localhost:9092

# ν† ν”½ 상세 정보
kafka-topics --describe \
    --bootstrap-server localhost:9092 \
    --topic orders

# ν† ν”½ μ‚­μ œ
kafka-topics --delete \
    --bootstrap-server localhost:9092 \
    --topic orders

# νŒŒν‹°μ…˜ 수 증가 (μΆ•μ†Œ λΆˆκ°€)
kafka-topics --alter \
    --bootstrap-server localhost:9092 \
    --topic orders \
    --partitions 12

3.2 Partition μ „λž΅

"""
νŒŒν‹°μ…˜ 선택 μ „λž΅:
1. Keyκ°€ 있으면: hash(key) % partitions
2. Keyκ°€ μ—†μœΌλ©΄: Round-robin

νŒŒν‹°μ…˜ 수 κ²°μ • μš”μ†Œ:
- μ˜ˆμƒ μ²˜λ¦¬λŸ‰ / 단일 νŒŒν‹°μ…˜ μ²˜λ¦¬λŸ‰
- Consumer 수 (νŒŒν‹°μ…˜ >= Consumer)
- λ””μŠ€ν¬ I/O κ³ λ €
"""

# νŒŒν‹°μ…˜ 수 ꢌμž₯
"""
- νŒŒν‹°μ…˜ λ‹Ή 100MB/s 처리 κ°€μ •
- 1GB/s 처리 ν•„μš” β†’ μ΅œμ†Œ 10개 νŒŒν‹°μ…˜
- Consumer ν™•μž₯μ„± κ³ λ € β†’ μ˜ˆμƒ Consumer 수의 2-3λ°°

주의:
- λ„ˆλ¬΄ λ§Žμ€ νŒŒν‹°μ…˜ β†’ 리더 μ„ μΆœ μ§€μ—°, λ©”λͺ¨λ¦¬ μ‚¬μš© 증가
- λ„ˆλ¬΄ 적은 νŒŒν‹°μ…˜ β†’ 병렬성 μ œν•œ
"""

4. Producer

4.1 κΈ°λ³Έ Producer

from confluent_kafka import Producer
import json

# Producer μ„€μ •
config = {
    'bootstrap.servers': 'localhost:9092',
    'client.id': 'my-producer',
    'acks': 'all',  # λͺ¨λ“  replica 확인
}

producer = Producer(config)

# 배달 확인 콜백
def delivery_callback(err, msg):
    if err:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')

# λ©”μ‹œμ§€ 전솑
def send_message(topic: str, key: str, value: dict):
    producer.produce(
        topic=topic,
        key=key.encode('utf-8'),
        value=json.dumps(value).encode('utf-8'),
        callback=delivery_callback
    )
    # 버퍼 ν”ŒλŸ¬μ‹œ (비동기 전솑 μ™„λ£Œ λŒ€κΈ°)
    producer.flush()

# μ‚¬μš© μ˜ˆμ‹œ
send_message(
    topic='orders',
    key='order-123',
    value={
        'order_id': 'order-123',
        'customer_id': 'cust-456',
        'amount': 99.99,
        'timestamp': '2024-01-15T10:30:00Z'
    }
)

4.2 κ³ μ„±λŠ₯ Producer

from confluent_kafka import Producer
import json
import time

class HighThroughputProducer:
    """κ³ μ²˜λ¦¬λŸ‰ Producer"""

    def __init__(self, bootstrap_servers: str):
        self.config = {
            'bootstrap.servers': bootstrap_servers,
            'client.id': 'high-throughput-producer',

            # μ„±λŠ₯ μ„€μ •
            'acks': '1',                    # λ¦¬λ”λ§Œ 확인 (빠름)
            'linger.ms': 5,                 # 배치 λŒ€κΈ° μ‹œκ°„
            'batch.size': 16384,            # 배치 크기 (16KB)
            'buffer.memory': 33554432,      # 버퍼 λ©”λͺ¨λ¦¬ (32MB)
            'compression.type': 'snappy',   # μ••μΆ•

            # μž¬μ‹œλ„ μ„€μ •
            'retries': 3,
            'retry.backoff.ms': 100,
        }
        self.producer = Producer(self.config)
        self.message_count = 0

    def send(self, topic: str, key: str, value: dict):
        """비동기 전솑"""
        self.producer.produce(
            topic=topic,
            key=key.encode('utf-8') if key else None,
            value=json.dumps(value).encode('utf-8'),
            callback=self._on_delivery
        )
        self.message_count += 1

        # 주기적 폴링 (이벀트 처리)
        if self.message_count % 1000 == 0:
            self.producer.poll(0)

    def _on_delivery(self, err, msg):
        if err:
            print(f'Delivery failed: {err}')

    def flush(self):
        """λͺ¨λ“  λ©”μ‹œμ§€ 전솑 μ™„λ£Œ λŒ€κΈ°"""
        self.producer.flush()

    def close(self):
        self.flush()


# λŒ€λŸ‰ 전솑 μ˜ˆμ‹œ
producer = HighThroughputProducer('localhost:9092')

start = time.time()
for i in range(100000):
    producer.send(
        topic='events',
        key=f'key-{i % 100}',
        value={'event_id': i, 'data': 'test'}
    )

producer.flush()
print(f'Sent 100,000 messages in {time.time() - start:.2f} seconds')

5. Consumer

5.1 κΈ°λ³Έ Consumer

from confluent_kafka import Consumer
import json

# Consumer μ„€μ •
config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-consumer-group',
    'auto.offset.reset': 'earliest',  # μ²˜μŒλΆ€ν„° 읽기
    'enable.auto.commit': True,
    'auto.commit.interval.ms': 5000,
}

consumer = Consumer(config)

# ν† ν”½ ꡬ독
consumer.subscribe(['orders'])

# λ©”μ‹œμ§€ μ†ŒλΉ„
try:
    while True:
        msg = consumer.poll(timeout=1.0)  # 1초 λŒ€κΈ°

        if msg is None:
            continue

        if msg.error():
            print(f'Consumer error: {msg.error()}')
            continue

        # λ©”μ‹œμ§€ 처리
        key = msg.key().decode('utf-8') if msg.key() else None
        value = json.loads(msg.value().decode('utf-8'))

        print(f'Received: topic={msg.topic()}, partition={msg.partition()}, '
              f'offset={msg.offset()}, key={key}, value={value}')

except KeyboardInterrupt:
    pass
finally:
    consumer.close()

5.2 μˆ˜λ™ 컀밋

from confluent_kafka import Consumer
import json

config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'manual-commit-group',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,  # μžλ™ 컀밋 λΉ„ν™œμ„±ν™”
}

consumer = Consumer(config)
consumer.subscribe(['orders'])

def process_message(value: dict) -> bool:
    """λ©”μ‹œμ§€ 처리 둜직"""
    try:
        # μ‹€μ œ λΉ„μ¦ˆλ‹ˆμŠ€ 둜직
        print(f"Processing: {value}")
        return True
    except Exception as e:
        print(f"Processing failed: {e}")
        return False

try:
    while True:
        msg = consumer.poll(timeout=1.0)

        if msg is None:
            continue
        if msg.error():
            continue

        value = json.loads(msg.value().decode('utf-8'))

        # 처리 성곡 μ‹œμ—λ§Œ 컀밋
        if process_message(value):
            consumer.commit(msg)  # νŠΉμ • λ©”μ‹œμ§€ 컀밋
            # λ˜λŠ” consumer.commit() # ν˜„μž¬ μ˜€ν”„μ…‹κΉŒμ§€ 컀밋
        else:
            print("Message processing failed, not committing")

except KeyboardInterrupt:
    pass
finally:
    consumer.close()

6. Consumer Group

6.1 Consumer Group κ°œλ…

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Consumer Group λ™μž‘                          β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                β”‚
β”‚   Topic: orders (6 partitions)                                 β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”      β”‚
β”‚   β”‚ P0   β”‚ β”‚ P1   β”‚ β”‚ P2   β”‚ β”‚ P3   β”‚ β”‚ P4   β”‚ β”‚ P5   β”‚      β”‚
β”‚   β””β”€β”€β”¬β”€β”€β”€β”˜ β””β”€β”€β”¬β”€β”€β”€β”˜ β””β”€β”€β”¬β”€β”€β”€β”˜ β””β”€β”€β”¬β”€β”€β”€β”˜ β””β”€β”€β”¬β”€β”€β”€β”˜ β””β”€β”€β”¬β”€β”€β”€β”˜      β”‚
β”‚      β”‚        β”‚        β”‚        β”‚        β”‚        β”‚           β”‚
β”‚   Consumer Group A (3 consumers)                               β”‚
β”‚      β”‚        β”‚        β”‚        β”‚        β”‚        β”‚           β”‚
β”‚      ↓        ↓        ↓        ↓        ↓        ↓           β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”‚
β”‚   β”‚ Consumer 1  β”‚  β”‚ Consumer 2  β”‚  β”‚ Consumer 3  β”‚          β”‚
β”‚   β”‚  P0, P1     β”‚  β”‚  P2, P3     β”‚  β”‚  P4, P5     β”‚          β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜          β”‚
β”‚                                                                β”‚
β”‚   각 νŒŒν‹°μ…˜μ€ κ·Έλ£Ή λ‚΄ ν•˜λ‚˜μ˜ Consumerμ—λ§Œ ν• λ‹Ή                    β”‚
β”‚                                                                β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

6.2 λ¦¬λ°ΈλŸ°μ‹±

from confluent_kafka import Consumer

def on_assign(consumer, partitions):
    """νŒŒν‹°μ…˜ ν• λ‹Ή 콜백"""
    print(f"Partitions assigned: {[p.partition for p in partitions]}")

def on_revoke(consumer, partitions):
    """νŒŒν‹°μ…˜ ν•΄μ œ 콜백"""
    print(f"Partitions revoked: {[p.partition for p in partitions]}")
    # 처리 쀑인 λ©”μ‹œμ§€ 컀밋
    consumer.commit()

config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest',
    'partition.assignment.strategy': 'cooperative-sticky',  # 점진적 λ¦¬λ°ΈλŸ°μ‹±
}

consumer = Consumer(config)
consumer.subscribe(
    ['orders'],
    on_assign=on_assign,
    on_revoke=on_revoke
)

6.3 Consumer Group λͺ¨λ‹ˆν„°λ§

# Consumer Group λͺ©λ‘
kafka-consumer-groups --list --bootstrap-server localhost:9092

# Consumer Group 상세
kafka-consumer-groups --describe \
    --bootstrap-server localhost:9092 \
    --group my-consumer-group

# 좜λ ₯ μ˜ˆμ‹œ:
# GROUP           TOPIC    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# my-group        orders   0          1500            1550            50
# my-group        orders   1          1200            1200            0

# Lag λͺ¨λ‹ˆν„°λ§ (처리 μ§€μ—°)
kafka-consumer-groups --describe \
    --bootstrap-server localhost:9092 \
    --group my-consumer-group \
    --members

7. μ‹€μ‹œκ°„ 데이터 처리 νŒ¨ν„΄

7.1 이벀트 기반 처리

from confluent_kafka import Consumer, Producer
import json

class EventProcessor:
    """이벀트 기반 처리 νŒŒμ΄ν”„λΌμΈ"""

    def __init__(self, bootstrap_servers: str, group_id: str):
        self.consumer = Consumer({
            'bootstrap.servers': bootstrap_servers,
            'group.id': group_id,
            'auto.offset.reset': 'earliest',
            'enable.auto.commit': False,
        })
        self.producer = Producer({
            'bootstrap.servers': bootstrap_servers,
        })

    def process_and_forward(
        self,
        source_topic: str,
        target_topic: str,
        transform_func
    ):
        """λ©”μ‹œμ§€ 처리 ν›„ λ‹€λ₯Έ ν† ν”½μœΌλ‘œ 전달"""
        self.consumer.subscribe([source_topic])

        try:
            while True:
                msg = self.consumer.poll(timeout=1.0)
                if msg is None:
                    continue
                if msg.error():
                    continue

                # λ³€ν™˜
                value = json.loads(msg.value().decode('utf-8'))
                transformed = transform_func(value)

                if transformed:
                    # λ‹€μŒ ν† ν”½μœΌλ‘œ 전달
                    self.producer.produce(
                        topic=target_topic,
                        key=msg.key(),
                        value=json.dumps(transformed).encode('utf-8')
                    )
                    self.producer.poll(0)

                # 컀밋
                self.consumer.commit(msg)

        except KeyboardInterrupt:
            pass
        finally:
            self.producer.flush()
            self.consumer.close()


# μ‚¬μš© μ˜ˆμ‹œ: μ£Όλ¬Έ β†’ 배솑 이벀트 λ³€ν™˜
def order_to_shipment(order: dict) -> dict:
    """μ£Όλ¬Έ 이벀트λ₯Ό 배솑 이벀트둜 λ³€ν™˜"""
    return {
        'shipment_id': f"ship-{order['order_id']}",
        'order_id': order['order_id'],
        'customer_id': order['customer_id'],
        'status': 'pending',
        'created_at': order['timestamp']
    }

processor = EventProcessor('localhost:9092', 'order-processor')
processor.process_and_forward('orders', 'shipments', order_to_shipment)

7.2 집계 처리 (Windowing)

from confluent_kafka import Consumer
from collections import defaultdict
from datetime import datetime, timedelta
import json
import threading
import time

class WindowedAggregator:
    """μ‹œκ°„ μœˆλ„μš° 기반 집계"""

    def __init__(self, window_size_seconds: int = 60):
        self.window_size = window_size_seconds
        self.windows = defaultdict(lambda: defaultdict(int))
        self.lock = threading.Lock()

    def add(self, key: str, value: int, timestamp: datetime):
        """κ°’ μΆ”κ°€"""
        window_start = self._get_window_start(timestamp)
        with self.lock:
            self.windows[window_start][key] += value

    def _get_window_start(self, timestamp: datetime) -> datetime:
        """μœˆλ„μš° μ‹œμž‘ μ‹œκ°„ 계산"""
        seconds = int(timestamp.timestamp())
        window_start_seconds = (seconds // self.window_size) * self.window_size
        return datetime.fromtimestamp(window_start_seconds)

    def get_and_clear_completed_windows(self) -> dict:
        """μ™„λ£Œλœ μœˆλ„μš° κ²°κ³Ό λ°˜ν™˜"""
        current_window = self._get_window_start(datetime.now())
        completed = {}

        with self.lock:
            for window_start, data in list(self.windows.items()):
                if window_start < current_window:
                    completed[window_start] = dict(data)
                    del self.windows[window_start]

        return completed


# μ‚¬μš© μ˜ˆμ‹œ: λΆ„λ‹Ή μΉ΄ν…Œκ³ λ¦¬λ³„ 판맀 수 집계
aggregator = WindowedAggregator(window_size_seconds=60)

def process_sales():
    consumer = Consumer({
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'sales-aggregator',
        'auto.offset.reset': 'earliest',
    })
    consumer.subscribe(['sales'])

    while True:
        msg = consumer.poll(timeout=1.0)
        if msg and not msg.error():
            value = json.loads(msg.value().decode('utf-8'))
            aggregator.add(
                key=value['category'],
                value=1,
                timestamp=datetime.fromisoformat(value['timestamp'])
            )

        # μ™„λ£Œλœ μœˆλ„μš° 좜λ ₯
        completed = aggregator.get_and_clear_completed_windows()
        for window, data in completed.items():
            print(f"Window {window}: {data}")

8. Kafka Streams와 λŒ€μ•ˆ

8.1 Faust (Python Kafka Streams)

import faust

# Faust μ•± 생성
app = faust.App(
    'myapp',
    broker='kafka://localhost:9092',
    value_serializer='json',
)

# ν† ν”½ μ •μ˜
orders_topic = app.topic('orders', value_type=dict)
processed_topic = app.topic('processed_orders', value_type=dict)

# 슀트림 처리 μ—μ΄μ „νŠΈ
@app.agent(orders_topic)
async def process_orders(orders):
    async for order in orders:
        # 처리 둜직
        processed = {
            **order,
            'processed': True,
            'processed_at': str(datetime.now())
        }
        # λ‹€λ₯Έ ν† ν”½μœΌλ‘œ 전솑
        await processed_topic.send(value=processed)

# ν…Œμ΄λΈ” (μƒνƒœ μ €μž₯)
order_counts = app.Table('order_counts', default=int)

@app.agent(orders_topic)
async def count_orders(orders):
    async for order in orders:
        customer_id = order['customer_id']
        order_counts[customer_id] += 1

# μ‹€ν–‰: faust -A myapp worker

μ—°μŠ΅ 문제

문제 1: Producer/Consumer

μ£Όλ¬Έ 이벀트λ₯Ό μƒμ„±ν•˜λŠ” Producer와 μ†ŒλΉ„ν•˜λŠ” Consumerλ₯Ό μž‘μ„±ν•˜μ„Έμš”.

문제 2: Consumer Group

3개의 Consumer둜 κ΅¬μ„±λœ Consumer Group을 λ§Œλ“€κ³  νŒŒν‹°μ…˜ 할당을 ν™•μΈν•˜μ„Έμš”.

문제 3: μ‹€μ‹œκ°„ 집계

μ‹€μ‹œκ°„ 판맀 μ΄λ²€νŠΈμ—μ„œ λΆ„λ‹Ή 총 λ§€μΆœμ„ κ³„μ‚°ν•˜λŠ” 슀트리밍 μ• ν”Œλ¦¬μΌ€μ΄μ…˜μ„ μž‘μ„±ν•˜μ„Έμš”.


μš”μ•½

κ°œλ… μ„€λͺ…
Topic λ©”μ‹œμ§€μ˜ 논리적 μΉ΄ν…Œκ³ λ¦¬
Partition Topic의 물리적 λΆ„ν• , 병렬 처리 λ‹¨μœ„
Producer λ©”μ‹œμ§€ λ°œν–‰μž
Consumer λ©”μ‹œμ§€ μ†ŒλΉ„μž
Consumer Group ν˜‘λ ₯적으둜 μ†ŒλΉ„ν•˜λŠ” Consumer μ§‘ν•©
Offset νŒŒν‹°μ…˜ λ‚΄ λ©”μ‹œμ§€ μœ„μΉ˜

참고 자료

to navigate between lessons