Spark SQL μ΅œμ ν™”

Spark SQL μ΅œμ ν™”

κ°œμš”

Spark SQL의 μ„±λŠ₯을 μ΅œμ ν™”ν•˜κΈ° μœ„ν•΄μ„œλŠ” Catalyst μ˜΅ν‹°λ§ˆμ΄μ €μ˜ λ™μž‘ 원리λ₯Ό μ΄ν•΄ν•˜κ³ , νŒŒν‹°μ…”λ‹, 캐싱, 쑰인 μ „λž΅ 등을 적절히 ν™œμš©ν•΄μ•Ό ν•©λ‹ˆλ‹€.


1. Catalyst Optimizer

1.1 μ‹€ν–‰ κ³„νš 이해

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("Optimization").getOrCreate()

df = spark.read.parquet("sales.parquet")

# μ‹€ν–‰ κ³„νš 확인
query = df.filter(col("amount") > 100) \
          .groupBy("category") \
          .sum("amount")

# 논리적 κ³„νš
query.explain(mode="simple")

# 전체 κ³„νš (논리적 + 물리적)
query.explain(mode="extended")

# λΉ„μš© 기반 κ³„νš
query.explain(mode="cost")

# ν˜•μ‹ν™”λœ 좜λ ₯
query.explain(mode="formatted")

1.2 Catalyst μ΅œμ ν™” 단계

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                   Catalyst Optimizer 단계                        β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚   1. Analysis (뢄석)                                            β”‚
β”‚      - 컬럼/ν…Œμ΄λΈ” 이름 확인                                     β”‚
β”‚      - νƒ€μž… 검증                                                β”‚
β”‚      ↓                                                          β”‚
β”‚   2. Logical Optimization (논리적 μ΅œμ ν™”)                        β”‚
β”‚      - Predicate Pushdown (쑰건절 ν‘Έμ‹œλ‹€μš΄)                      β”‚
β”‚      - Column Pruning (컬럼 κ°€μ§€μΉ˜κΈ°)                            β”‚
β”‚      - Constant Folding (μƒμˆ˜ 폴딩)                             β”‚
β”‚      ↓                                                          β”‚
β”‚   3. Physical Planning (물리적 κ³„νš)                             β”‚
β”‚      - 쑰인 μ „λž΅ 선택                                           β”‚
β”‚      - 집계 μ „λž΅ 선택                                           β”‚
β”‚      ↓                                                          β”‚
β”‚   4. Code Generation (μ½”λ“œ 생성)                                β”‚
β”‚      - Whole-Stage Code Generation                              β”‚
β”‚      - JIT 컴파일                                               β”‚
β”‚                                                                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

1.3 μ£Όμš” μ΅œμ ν™” 기법

# 1. Predicate Pushdown (쑰건절 ν‘Έμ‹œλ‹€μš΄)
# ν•„ν„°λ₯Ό 데이터 μ†ŒμŠ€ 레벨둜 ν‘Έμ‹œ
df = spark.read.parquet("data.parquet")
filtered = df.filter(col("date") == "2024-01-01")  # Parquetμ—μ„œ 직접 필터링

# 2. Column Pruning (컬럼 κ°€μ§€μΉ˜κΈ°)
# ν•„μš”ν•œ 컬럼만 읽기
df.select("name", "amount")  # λ‹€λ₯Έ μ»¬λŸΌμ€ 읽지 μ•ŠμŒ

# 3. Projection Pushdown
# SELECTλ₯Ό 데이터 μ†ŒμŠ€λ‘œ ν‘Έμ‹œ
df = spark.read.format("jdbc") \
    .option("pushDownPredicate", "true") \
    .load()

# 4. Constant Folding
# μƒμˆ˜ ν‘œν˜„μ‹ 미리 계산
df.filter(col("value") > 1 + 2)  # > 3으둜 λ³€ν™˜

2. νŒŒν‹°μ…”λ‹

2.1 νŒŒν‹°μ…˜ κ°œλ…

# νŒŒν‹°μ…˜ 수 확인
df.rdd.getNumPartitions()

# νŒŒν‹°μ…˜ μž¬λΆ„λ°°
df.repartition(100)                      # 100개 νŒŒν‹°μ…˜μœΌλ‘œ
df.repartition("date")                   # 컬럼 κΈ°μ€€ νŒŒν‹°μ…”λ‹
df.repartition(100, "date", "category")  # 컬럼 + 수 μ§€μ •

# νŒŒν‹°μ…˜ 쀄이기 (μ…”ν”Œ 없이)
df.coalesce(10)  # μ…”ν”Œ 없이 νŒŒν‹°μ…˜ μΆ•μ†Œ

# νŒŒν‹°μ…˜ 정보 확인
def print_partition_info(df):
    print(f"Partitions: {df.rdd.getNumPartitions()}")
    for idx, partition in enumerate(df.rdd.glom().collect()):
        print(f"Partition {idx}: {len(partition)} rows")

2.2 νŒŒν‹°μ…˜ μ „λž΅

# μ μ ˆν•œ νŒŒν‹°μ…˜ 수 계산
"""
ꢌμž₯ 곡식:
- νŒŒν‹°μ…˜ 수 = 데이터 크기(MB) / 128MB
- λ˜λŠ”: ν΄λŸ¬μŠ€ν„° μ½”μ–΄ 수 * 2~4

μ˜ˆμ‹œ:
- 10GB 데이터 β†’ 10,000MB / 128MB β‰ˆ 80 νŒŒν‹°μ…˜
- 100 μ½”μ–΄ ν΄λŸ¬μŠ€ν„° β†’ 200~400 νŒŒν‹°μ…˜
"""

# νŒŒν‹°μ…˜ 수 μ„€μ •
spark.conf.set("spark.sql.shuffle.partitions", 200)

# λ²”μœ„ νŒŒν‹°μ…”λ‹ (μ •λ ¬λœ νŒŒν‹°μ…˜)
df.repartitionByRange(100, "date")

# ν•΄μ‹œ νŒŒν‹°μ…”λ‹
df.repartition(100, "user_id")  # user_id κΈ°μ€€ ν•΄μ‹œ

2.3 νŒŒν‹°μ…˜ μ €μž₯

# νŒŒν‹°μ…˜λ³„ μ €μž₯
df.write \
    .partitionBy("year", "month") \
    .parquet("output/partitioned_data")

# κ²°κ³Ό 디렉토리 ꡬ쑰:
# output/partitioned_data/
#   year=2024/
#     month=01/
#       part-00000.parquet
#     month=02/
#       part-00000.parquet

# νŒŒν‹°μ…˜ 데이터 읽기 (프루닝)
df = spark.read.parquet("output/partitioned_data")
# year=2024, month=01 νŒŒν‹°μ…˜λ§Œ 읽음
df.filter((col("year") == 2024) & (col("month") == 1))

# λ²„ν‚·νŒ… (쑰인 μ΅œμ ν™”)
df.write \
    .bucketBy(100, "user_id") \
    .sortBy("timestamp") \
    .saveAsTable("bucketed_table")

3. 캐싱

3.1 μΊμ‹œ κΈ°λ³Έ

# DataFrame μΊμ‹œ
df.cache()           # MEMORY_AND_DISK κΈ°λ³Έ
df.persist()         # 동일

# μΊμ‹œ 레벨 μ§€μ •
from pyspark import StorageLevel

df.persist(StorageLevel.MEMORY_ONLY)           # λ©”λͺ¨λ¦¬λ§Œ
df.persist(StorageLevel.MEMORY_AND_DISK)       # λ©”λͺ¨λ¦¬ + λ””μŠ€ν¬
df.persist(StorageLevel.MEMORY_ONLY_SER)       # 직렬화 (λ©”λͺ¨λ¦¬ μ ˆμ•½)
df.persist(StorageLevel.DISK_ONLY)             # λ””μŠ€ν¬λ§Œ
df.persist(StorageLevel.MEMORY_AND_DISK_SER)   # 직렬화 + λ””μŠ€ν¬

# μΊμ‹œ ν•΄μ œ
df.unpersist()

# μΊμ‹œ μƒνƒœ 확인
spark.catalog.isCached("table_name")

3.2 μΊμ‹œ μ „λž΅

# μΊμ‹œκ°€ 효과적인 경우:
# 1. 동일 DataFrame을 μ—¬λŸ¬ 번 μ‚¬μš©
# 2. λΉ„μ‹Ό λ³€ν™˜ ν›„ μž¬μ‚¬μš©
# 3. 반볡 μ•Œκ³ λ¦¬μ¦˜

# μ˜ˆμ‹œ: μ—¬λŸ¬ μ§‘κ³„μ—μ„œ μž¬μ‚¬μš©
expensive_df = spark.read.parquet("large_data.parquet") \
    .filter(col("status") == "active") \
    .join(other_df, "key")

expensive_df.cache()

# μ—¬λŸ¬ μž‘μ—…μ—μ„œ μž¬μ‚¬μš©
result1 = expensive_df.groupBy("category").count()
result2 = expensive_df.groupBy("region").sum("amount")
result3 = expensive_df.filter(col("amount") > 1000).count()

# μž‘μ—… μ™„λ£Œ ν›„ ν•΄μ œ
expensive_df.unpersist()

3.3 μΊμ‹œ λͺ¨λ‹ˆν„°λ§

# Spark UIμ—μ„œ 확인 (http://localhost:4040/storage)

# ν”„λ‘œκ·Έλž˜λ° 방식 확인
sc = spark.sparkContext

# μΊμ‹œλœ RDD λͺ©λ‘
for rdd_id, rdd_info in sc._jsc.sc().getRDDStorageInfo():
    print(f"RDD {rdd_id}: {rdd_info}")

# 전체 μΊμ‹œ 클리어
spark.catalog.clearCache()

4. 쑰인 μ „λž΅

4.1 쑰인 μœ ν˜•λ³„ νŠΉμ„±

# Spark 쑰인 μ „λž΅:
join_strategies = {
    "Broadcast Hash Join": {
        "condition": "μž‘μ€ ν…Œμ΄λΈ” (< 10MB κΈ°λ³Έ)",
        "performance": "κ°€μž₯ 빠름",
        "shuffle": "μ—†μŒ (μž‘μ€ ν…Œμ΄λΈ” λΈŒλ‘œλ“œμΊμŠ€νŠΈ)"
    },
    "Sort Merge Join": {
        "condition": "큰 ν…Œμ΄λΈ” κ°„ 쑰인",
        "performance": "μ•ˆμ •μ ",
        "shuffle": "μ–‘μͺ½ ν…Œμ΄λΈ” μ…”ν”Œ + μ •λ ¬"
    },
    "Shuffle Hash Join": {
        "condition": "ν•œμͺ½μ΄ μž‘μ„ λ•Œ",
        "performance": "쀑간",
        "shuffle": "μ–‘μͺ½ μ…”ν”Œ"
    },
    "Broadcast Nested Loop Join": {
        "condition": "쑰인 쑰건 μ—†μŒ (Cross)",
        "performance": "느림",
        "shuffle": "μ—†μŒ (λΈŒλ‘œλ“œμΊμŠ€νŠΈ)"
    }
}

4.2 Broadcast Join κ°•μ œ

from pyspark.sql.functions import broadcast

# μž‘μ€ ν…Œμ΄λΈ” λΈŒλ‘œλ“œμΊμŠ€νŠΈ 힌트
large_df.join(broadcast(small_df), "key")

# μ„€μ •μœΌλ‘œ μž„κ³„κ°’ μ‘°μ •
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024)  # 100MB

# λΈŒλ‘œλ“œμΊμŠ€νŠΈ λΉ„ν™œμ„±ν™”
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

# SQL 힌트
spark.sql("""
    SELECT /*+ BROADCAST(small_table) */
        large_table.*, small_table.name
    FROM large_table
    JOIN small_table ON large_table.id = small_table.id
""")

4.3 쑰인 μ΅œμ ν™” 팁

# 1. 쑰인 μ „ 필터링
# λ‚˜μœ 예
df1.join(df2, "key").filter(col("status") == "active")

# 쒋은 예
df1.filter(col("status") == "active").join(df2, "key")


# 2. 쑰인 ν‚€ 데이터 νƒ€μž… 일치
# λ‚˜μœ 예 (νƒ€μž… 뢈일치둜 μ•”μ‹œμ  μΊμŠ€νŒ…)
df1.join(df2, df1.id == df2.id)  # idκ°€ string vs int

# 쒋은 예
df1 = df1.withColumn("id", col("id").cast("int"))
df1.join(df2, "id")


# 3. 슀큐 데이터 처리 (Skew Join)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", 5)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")


# 4. λ²„ν‚·νŒ…μœΌλ‘œ 쑰인 μ΅œμ ν™”
# ν…Œμ΄λΈ” 생성 μ‹œ λ²„ν‚·νŒ…
df.write.bucketBy(100, "user_id").saveAsTable("users_bucketed")
other_df.write.bucketBy(100, "user_id").saveAsTable("orders_bucketed")

# λ²„ν‚·νŒ…λœ ν…Œμ΄λΈ” 쑰인 (μ…”ν”Œ μ—†μŒ)
spark.table("users_bucketed").join(spark.table("orders_bucketed"), "user_id")

5. μ„±λŠ₯ νŠœλ‹

5.1 μ„€μ • μ΅œμ ν™”

# λ©”λͺ¨λ¦¬ μ„€μ •
spark = SparkSession.builder \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.memoryOverhead", "2g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.memory.storageFraction", "0.3") \
    .getOrCreate()

# 병렬성 μ„€μ •
spark.conf.set("spark.default.parallelism", 200)
spark.conf.set("spark.sql.shuffle.partitions", 200)

# Adaptive Query Execution (AQE) - Spark 3.0+
spark.conf.set("spark.sql.adaptive.enabled", True)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", True)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", True)

# 직렬화
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

# 동적 ν• λ‹Ή
spark.conf.set("spark.dynamicAllocation.enabled", True)
spark.conf.set("spark.dynamicAllocation.minExecutors", 2)
spark.conf.set("spark.dynamicAllocation.maxExecutors", 100)

5.2 데이터 ν˜•μ‹ μ΅œμ ν™”

# Parquet μ„€μ •
spark.conf.set("spark.sql.parquet.compression.codec", "snappy")  # λ˜λŠ” zstd
spark.conf.set("spark.sql.parquet.filterPushdown", True)

# 파일 크기 μ΅œμ ν™”
spark.conf.set("spark.sql.files.maxPartitionBytes", "128MB")
spark.conf.set("spark.sql.files.openCostInBytes", "4MB")

# μž‘μ€ 파일 병합
spark.conf.set("spark.sql.adaptive.coalescePartitions.parallelismFirst", False)
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")

# 컬럼 프루닝 확인
df.select("needed_column1", "needed_column2").explain()

5.3 μ…”ν”Œ μ΅œμ ν™”

# μ…”ν”Œ νŒŒν‹°μ…˜ 수 μ΅œμ ν™”
# AQE둜 μžλ™ μ‘°μ • ꢌμž₯
spark.conf.set("spark.sql.adaptive.enabled", True)

# μˆ˜λ™ μ„€μ • μ‹œ
data_size_gb = 10
partition_size_mb = 128
optimal_partitions = (data_size_gb * 1024) // partition_size_mb
spark.conf.set("spark.sql.shuffle.partitions", optimal_partitions)

# μ…”ν”Œ μ••μΆ•
spark.conf.set("spark.shuffle.compress", True)

# μ…”ν”Œ μŠ€ν•„ μ΅œμ†Œν™”
spark.conf.set("spark.shuffle.spill.compress", True)

# μ…”ν”Œ μ„œλΉ„μŠ€ (μ™ΈλΆ€)
spark.conf.set("spark.shuffle.service.enabled", True)

6. μ„±λŠ₯ λͺ¨λ‹ˆν„°λ§

6.1 Spark UI ν™œμš©

# Spark UI μ ‘κ·Ό: http://<driver-host>:4040

# UI 탭별 정보:
"""
Jobs: Job μ‹€ν–‰ ν˜„ν™©, μ‹œκ°„
Stages: Stage별 상세 (μ…”ν”Œ, 데이터 크기)
Storage: μΊμ‹œλœ RDD/DataFrame
Environment: μ„€μ • κ°’
Executors: Executor μƒνƒœ, λ©”λͺ¨λ¦¬
SQL: SQL 쿼리 κ³„νš
"""

# 이λ ₯ μ„œλ²„ (μ™„λ£Œλœ μž‘μ—…)
# spark.eventLog.enabled=true
# spark.history.fs.logDirectory=hdfs:///spark-history

6.2 ν”„λ‘œκ·Έλž˜λ° 방식 λͺ¨λ‹ˆν„°λ§

# μ‹€ν–‰ μ‹œκ°„ μΈ‘μ •
import time

start = time.time()
result = df.groupBy("category").count().collect()
end = time.time()
print(f"Execution time: {end - start:.2f} seconds")

# μ‹€ν–‰ κ³„νšμ—μ„œ μ…”ν”Œ 확인
df.explain(mode="formatted")

# 물리적 κ³„νšμ—μ„œ 쑰인 μ „λž΅ 확인
# Exchange = μ…”ν”Œ λ°œμƒ
# BroadcastHashJoin = λΈŒλ‘œλ“œμΊμŠ€νŠΈ 쑰인
# SortMergeJoin = μ†ŒνŠΈ λ¨Έμ§€ 쑰인

6.3 λ©”νŠΈλ¦­ μˆ˜μ§‘

# DataFrame 크기 μΆ”μ •
def estimate_size(df):
    """DataFrame 크기 μΆ”μ • (λ°”μ΄νŠΈ)"""
    return df._jdf.queryExecution().optimizedPlan().stats().sizeInBytes()

# νŒŒν‹°μ…˜λ³„ λ ˆμ½”λ“œ 수
partition_counts = df.rdd.mapPartitions(
    lambda it: [sum(1 for _ in it)]
).collect()

print(f"Min: {min(partition_counts)}, Max: {max(partition_counts)}")
print(f"Skew ratio: {max(partition_counts) / (sum(partition_counts) / len(partition_counts)):.2f}")

7. 일반적인 μ„±λŠ₯ λ¬Έμ œμ™€ ν•΄κ²°

7.1 데이터 슀큐 (Skew)

# 문제: νŠΉμ • 킀에 데이터 집쀑
# 증상: 일뢀 Task만 였래 κ±Έλ¦Ό

# ν•΄κ²° 1: AQE 슀큐 쑰인
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)

# ν•΄κ²° 2: μ†”νŠΈ ν‚€ μΆ”κ°€
from pyspark.sql.functions import rand, floor

num_salts = 10
df_salted = df.withColumn("salt", floor(rand() * num_salts))

# μ†”νŠΈ 쑰인
result = df_salted.join(
    other_df.crossJoin(
        spark.range(num_salts).withColumnRenamed("id", "salt")
    ),
    ["key", "salt"]
).drop("salt")

# ν•΄κ²° 3: λΈŒλ‘œλ“œμΊμŠ€νŠΈ (κ°€λŠ₯ν•œ 경우)
result = df.join(broadcast(small_df), "key")

7.2 OOM (Out of Memory)

# 문제: λ©”λͺ¨λ¦¬ λΆ€μ‘±
# 증상: OutOfMemoryError

# ν•΄κ²° 1: Executor λ©”λͺ¨λ¦¬ 증가
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.executor.memoryOverhead", "2g")

# ν•΄κ²° 2: νŒŒν‹°μ…˜ 수 증가 (데이터 λΆ„μ‚°)
df.repartition(500)

# ν•΄κ²° 3: λΆˆν•„μš”ν•œ μΊμ‹œ ν•΄μ œ
spark.catalog.clearCache()

# ν•΄κ²° 4: λΈŒλ‘œλ“œμΊμŠ€νŠΈ μž„κ³„κ°’ κ°μ†Œ
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")

7.3 μ…”ν”Œ κ³Όλ‹€

# 문제: μ…”ν”Œλ‘œ μΈν•œ λ„€νŠΈμ›Œν¬/λ””μŠ€ν¬ I/O
# 증상: Stage κ°„ λŒ€κΈ° μ‹œκ°„ 증가

# ν•΄κ²° 1: μ…”ν”Œ μ „ 필터링
df.filter(col("status") == "active").groupBy("key").count()

# ν•΄κ²° 2: νŒŒν‹°μ…”λ‹ μ „λž΅ λ³€κ²½
# 같은 ν‚€λ‘œ νŒŒν‹°μ…”λ‹λœ λ°μ΄ν„°λŠ” μ…”ν”Œ 없이 쑰인
df1.repartition(100, "key").join(df2.repartition(100, "key"), "key")

# ν•΄κ²° 3: λ²„ν‚·νŒ… μ‚¬μš©
df.write.bucketBy(100, "key").saveAsTable("bucketed_table")

μ—°μŠ΅ 문제

문제 1: μ‹€ν–‰ κ³„νš 뢄석

μ£Όμ–΄μ§„ 쿼리의 μ‹€ν–‰ κ³„νšμ„ λΆ„μ„ν•˜κ³  μ΅œμ ν™” 포인트λ₯Ό μ°ΎμœΌμ„Έμš”.

문제 2: 쑰인 μ΅œμ ν™”

1μ–΅ 건의 νŠΈλžœμž­μ…˜ ν…Œμ΄λΈ”κ³Ό 100만 건의 고객 ν…Œμ΄λΈ”μ„ μ‘°μΈν•˜λŠ” 졜적의 방법을 μ„€κ³„ν•˜μ„Έμš”.

문제 3: 슀큐 처리

νŠΉμ • μΉ΄ν…Œκ³ λ¦¬μ— 데이터가 μ§‘μ€‘λœ μƒν™©μ—μ„œ 집계 μ„±λŠ₯을 κ°œμ„ ν•˜μ„Έμš”.


μš”μ•½

μ΅œμ ν™” μ˜μ—­ 기법
Catalyst Predicate Pushdown, Column Pruning
νŒŒν‹°μ…”λ‹ repartition, coalesce, partitionBy
캐싱 cache, persist, StorageLevel
쑰인 Broadcast, Sort Merge, λ²„ν‚·νŒ…
AQE μžλ™ νŒŒν‹°μ…˜ 병합, 슀큐 처리

참고 자료

to navigate between lessons