Spark SQL μ΅μ ν
Spark SQL μ΅μ ν¶
κ°μ¶
Spark SQLμ μ±λ₯μ μ΅μ ννκΈ° μν΄μλ Catalyst μ΅ν°λ§μ΄μ μ λμ μ리λ₯Ό μ΄ν΄νκ³ , νν°μ λ, μΊμ±, μ‘°μΈ μ λ΅ λ±μ μ μ ν νμ©ν΄μΌ ν©λλ€.
1. Catalyst Optimizer¶
1.1 μ€ν κ³ν μ΄ν΄¶
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("Optimization").getOrCreate()
df = spark.read.parquet("sales.parquet")
# μ€ν κ³ν νμΈ
query = df.filter(col("amount") > 100) \
.groupBy("category") \
.sum("amount")
# λ
Όλ¦¬μ κ³ν
query.explain(mode="simple")
# μ 체 κ³ν (λ
Όλ¦¬μ + 물리μ )
query.explain(mode="extended")
# λΉμ© κΈ°λ° κ³ν
query.explain(mode="cost")
# νμνλ μΆλ ₯
query.explain(mode="formatted")
1.2 Catalyst μ΅μ ν λ¨κ³¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Catalyst Optimizer λ¨κ³ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β 1. Analysis (λΆμ) β
β - 컬λΌ/ν
μ΄λΈ μ΄λ¦ νμΈ β
β - νμ
κ²μ¦ β
β β β
β 2. Logical Optimization (λ
Όλ¦¬μ μ΅μ ν) β
β - Predicate Pushdown (쑰건μ νΈμλ€μ΄) β
β - Column Pruning (μ»¬λΌ κ°μ§μΉκΈ°) β
β - Constant Folding (μμ ν΄λ©) β
β β β
β 3. Physical Planning (물리μ κ³ν) β
β - μ‘°μΈ μ λ΅ μ ν β
β - μ§κ³ μ λ΅ μ ν β
β β β
β 4. Code Generation (μ½λ μμ±) β
β - Whole-Stage Code Generation β
β - JIT μ»΄νμΌ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
1.3 μ£Όμ μ΅μ ν κΈ°λ²¶
# 1. Predicate Pushdown (쑰건μ νΈμλ€μ΄)
# νν°λ₯Ό λ°μ΄ν° μμ€ λ λ²¨λ‘ νΈμ
df = spark.read.parquet("data.parquet")
filtered = df.filter(col("date") == "2024-01-01") # Parquetμμ μ§μ νν°λ§
# 2. Column Pruning (μ»¬λΌ κ°μ§μΉκΈ°)
# νμν 컬λΌλ§ μ½κΈ°
df.select("name", "amount") # λ€λ₯Έ 컬λΌμ μ½μ§ μμ
# 3. Projection Pushdown
# SELECTλ₯Ό λ°μ΄ν° μμ€λ‘ νΈμ
df = spark.read.format("jdbc") \
.option("pushDownPredicate", "true") \
.load()
# 4. Constant Folding
# μμ ννμ 미리 κ³μ°
df.filter(col("value") > 1 + 2) # > 3μΌλ‘ λ³ν
2. νν°μ λ¶
2.1 νν°μ κ°λ ¶
# νν°μ
μ νμΈ
df.rdd.getNumPartitions()
# νν°μ
μ¬λΆλ°°
df.repartition(100) # 100κ° νν°μ
μΌλ‘
df.repartition("date") # μ»¬λΌ κΈ°μ€ νν°μ
λ
df.repartition(100, "date", "category") # μ»¬λΌ + μ μ§μ
# νν°μ
μ€μ΄κΈ° (μ
ν μμ΄)
df.coalesce(10) # μ
ν μμ΄ νν°μ
μΆμ
# νν°μ
μ 보 νμΈ
def print_partition_info(df):
print(f"Partitions: {df.rdd.getNumPartitions()}")
for idx, partition in enumerate(df.rdd.glom().collect()):
print(f"Partition {idx}: {len(partition)} rows")
2.2 νν°μ μ λ΅¶
# μ μ ν νν°μ
μ κ³μ°
"""
κΆμ₯ 곡μ:
- νν°μ
μ = λ°μ΄ν° ν¬κΈ°(MB) / 128MB
- λλ: ν΄λ¬μ€ν° μ½μ΄ μ * 2~4
μμ:
- 10GB λ°μ΄ν° β 10,000MB / 128MB β 80 νν°μ
- 100 μ½μ΄ ν΄λ¬μ€ν° β 200~400 νν°μ
"""
# νν°μ
μ μ€μ
spark.conf.set("spark.sql.shuffle.partitions", 200)
# λ²μ νν°μ
λ (μ λ ¬λ νν°μ
)
df.repartitionByRange(100, "date")
# ν΄μ νν°μ
λ
df.repartition(100, "user_id") # user_id κΈ°μ€ ν΄μ
2.3 νν°μ μ μ₯¶
# νν°μ
λ³ μ μ₯
df.write \
.partitionBy("year", "month") \
.parquet("output/partitioned_data")
# κ²°κ³Ό λλ ν 리 ꡬ쑰:
# output/partitioned_data/
# year=2024/
# month=01/
# part-00000.parquet
# month=02/
# part-00000.parquet
# νν°μ
λ°μ΄ν° μ½κΈ° (ν루λ)
df = spark.read.parquet("output/partitioned_data")
# year=2024, month=01 νν°μ
λ§ μ½μ
df.filter((col("year") == 2024) & (col("month") == 1))
# λ²ν·ν
(μ‘°μΈ μ΅μ ν)
df.write \
.bucketBy(100, "user_id") \
.sortBy("timestamp") \
.saveAsTable("bucketed_table")
3. μΊμ±¶
3.1 μΊμ κΈ°λ³Έ¶
# DataFrame μΊμ
df.cache() # MEMORY_AND_DISK κΈ°λ³Έ
df.persist() # λμΌ
# μΊμ λ 벨 μ§μ
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_ONLY) # λ©λͺ¨λ¦¬λ§
df.persist(StorageLevel.MEMORY_AND_DISK) # λ©λͺ¨λ¦¬ + λμ€ν¬
df.persist(StorageLevel.MEMORY_ONLY_SER) # μ§λ ¬ν (λ©λͺ¨λ¦¬ μ μ½)
df.persist(StorageLevel.DISK_ONLY) # λμ€ν¬λ§
df.persist(StorageLevel.MEMORY_AND_DISK_SER) # μ§λ ¬ν + λμ€ν¬
# μΊμ ν΄μ
df.unpersist()
# μΊμ μν νμΈ
spark.catalog.isCached("table_name")
3.2 μΊμ μ λ΅¶
# μΊμκ° ν¨κ³Όμ μΈ κ²½μ°:
# 1. λμΌ DataFrameμ μ¬λ¬ λ² μ¬μ©
# 2. λΉμΌ λ³ν ν μ¬μ¬μ©
# 3. λ°λ³΅ μκ³ λ¦¬μ¦
# μμ: μ¬λ¬ μ§κ³μμ μ¬μ¬μ©
expensive_df = spark.read.parquet("large_data.parquet") \
.filter(col("status") == "active") \
.join(other_df, "key")
expensive_df.cache()
# μ¬λ¬ μμ
μμ μ¬μ¬μ©
result1 = expensive_df.groupBy("category").count()
result2 = expensive_df.groupBy("region").sum("amount")
result3 = expensive_df.filter(col("amount") > 1000).count()
# μμ
μλ£ ν ν΄μ
expensive_df.unpersist()
3.3 μΊμ λͺ¨λν°λ§¶
# Spark UIμμ νμΈ (http://localhost:4040/storage)
# νλ‘κ·Έλλ° λ°©μ νμΈ
sc = spark.sparkContext
# μΊμλ RDD λͺ©λ‘
for rdd_id, rdd_info in sc._jsc.sc().getRDDStorageInfo():
print(f"RDD {rdd_id}: {rdd_info}")
# μ 체 μΊμ ν΄λ¦¬μ΄
spark.catalog.clearCache()
4. μ‘°μΈ μ λ΅¶
4.1 μ‘°μΈ μ νλ³ νΉμ±¶
# Spark μ‘°μΈ μ λ΅:
join_strategies = {
"Broadcast Hash Join": {
"condition": "μμ ν
μ΄λΈ (< 10MB κΈ°λ³Έ)",
"performance": "κ°μ₯ λΉ λ¦",
"shuffle": "μμ (μμ ν
μ΄λΈ λΈλ‘λμΊμ€νΈ)"
},
"Sort Merge Join": {
"condition": "ν° ν
μ΄λΈ κ° μ‘°μΈ",
"performance": "μμ μ ",
"shuffle": "μμͺ½ ν
μ΄λΈ μ
ν + μ λ ¬"
},
"Shuffle Hash Join": {
"condition": "νμͺ½μ΄ μμ λ",
"performance": "μ€κ°",
"shuffle": "μμͺ½ μ
ν"
},
"Broadcast Nested Loop Join": {
"condition": "μ‘°μΈ μ‘°κ±΄ μμ (Cross)",
"performance": "λλ¦Ό",
"shuffle": "μμ (λΈλ‘λμΊμ€νΈ)"
}
}
4.2 Broadcast Join κ°μ ¶
from pyspark.sql.functions import broadcast
# μμ ν
μ΄λΈ λΈλ‘λμΊμ€νΈ ννΈ
large_df.join(broadcast(small_df), "key")
# μ€μ μΌλ‘ μκ³κ° μ‘°μ
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024) # 100MB
# λΈλ‘λμΊμ€νΈ λΉνμ±ν
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
# SQL ννΈ
spark.sql("""
SELECT /*+ BROADCAST(small_table) */
large_table.*, small_table.name
FROM large_table
JOIN small_table ON large_table.id = small_table.id
""")
4.3 μ‘°μΈ μ΅μ ν ν¶
# 1. μ‘°μΈ μ νν°λ§
# λμ μ
df1.join(df2, "key").filter(col("status") == "active")
# μ’μ μ
df1.filter(col("status") == "active").join(df2, "key")
# 2. μ‘°μΈ ν€ λ°μ΄ν° νμ
μΌμΉ
# λμ μ (νμ
λΆμΌμΉλ‘ μμμ μΊμ€ν
)
df1.join(df2, df1.id == df2.id) # idκ° string vs int
# μ’μ μ
df1 = df1.withColumn("id", col("id").cast("int"))
df1.join(df2, "id")
# 3. μ€ν λ°μ΄ν° μ²λ¦¬ (Skew Join)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", 5)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")
# 4. λ²ν·ν
μΌλ‘ μ‘°μΈ μ΅μ ν
# ν
μ΄λΈ μμ± μ λ²ν·ν
df.write.bucketBy(100, "user_id").saveAsTable("users_bucketed")
other_df.write.bucketBy(100, "user_id").saveAsTable("orders_bucketed")
# λ²ν·ν
λ ν
μ΄λΈ μ‘°μΈ (μ
ν μμ)
spark.table("users_bucketed").join(spark.table("orders_bucketed"), "user_id")
5. μ±λ₯ νλ¶
5.1 μ€μ μ΅μ ν¶
# λ©λͺ¨λ¦¬ μ€μ
spark = SparkSession.builder \
.config("spark.executor.memory", "8g") \
.config("spark.executor.memoryOverhead", "2g") \
.config("spark.driver.memory", "4g") \
.config("spark.memory.fraction", "0.8") \
.config("spark.memory.storageFraction", "0.3") \
.getOrCreate()
# λ³λ ¬μ± μ€μ
spark.conf.set("spark.default.parallelism", 200)
spark.conf.set("spark.sql.shuffle.partitions", 200)
# Adaptive Query Execution (AQE) - Spark 3.0+
spark.conf.set("spark.sql.adaptive.enabled", True)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", True)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", True)
# μ§λ ¬ν
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
# λμ ν λΉ
spark.conf.set("spark.dynamicAllocation.enabled", True)
spark.conf.set("spark.dynamicAllocation.minExecutors", 2)
spark.conf.set("spark.dynamicAllocation.maxExecutors", 100)
5.2 λ°μ΄ν° νμ μ΅μ ν¶
# Parquet μ€μ
spark.conf.set("spark.sql.parquet.compression.codec", "snappy") # λλ zstd
spark.conf.set("spark.sql.parquet.filterPushdown", True)
# νμΌ ν¬κΈ° μ΅μ ν
spark.conf.set("spark.sql.files.maxPartitionBytes", "128MB")
spark.conf.set("spark.sql.files.openCostInBytes", "4MB")
# μμ νμΌ λ³ν©
spark.conf.set("spark.sql.adaptive.coalescePartitions.parallelismFirst", False)
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")
# μ»¬λΌ ν루λ νμΈ
df.select("needed_column1", "needed_column2").explain()
5.3 μ ν μ΅μ ν¶
# μ
ν νν°μ
μ μ΅μ ν
# AQEλ‘ μλ μ‘°μ κΆμ₯
spark.conf.set("spark.sql.adaptive.enabled", True)
# μλ μ€μ μ
data_size_gb = 10
partition_size_mb = 128
optimal_partitions = (data_size_gb * 1024) // partition_size_mb
spark.conf.set("spark.sql.shuffle.partitions", optimal_partitions)
# μ
ν μμΆ
spark.conf.set("spark.shuffle.compress", True)
# μ
ν μ€ν μ΅μν
spark.conf.set("spark.shuffle.spill.compress", True)
# μ
ν μλΉμ€ (μΈλΆ)
spark.conf.set("spark.shuffle.service.enabled", True)
6. μ±λ₯ λͺ¨λν°λ§¶
6.1 Spark UI νμ©¶
# Spark UI μ κ·Ό: http://<driver-host>:4040
# UI νλ³ μ 보:
"""
Jobs: Job μ€ν νν©, μκ°
Stages: Stageλ³ μμΈ (μ
ν, λ°μ΄ν° ν¬κΈ°)
Storage: μΊμλ RDD/DataFrame
Environment: μ€μ κ°
Executors: Executor μν, λ©λͺ¨λ¦¬
SQL: SQL 쿼리 κ³ν
"""
# μ΄λ ₯ μλ² (μλ£λ μμ
)
# spark.eventLog.enabled=true
# spark.history.fs.logDirectory=hdfs:///spark-history
6.2 νλ‘κ·Έλλ° λ°©μ λͺ¨λν°λ§¶
# μ€ν μκ° μΈ‘μ
import time
start = time.time()
result = df.groupBy("category").count().collect()
end = time.time()
print(f"Execution time: {end - start:.2f} seconds")
# μ€ν κ³νμμ μ
ν νμΈ
df.explain(mode="formatted")
# 물리μ κ³νμμ μ‘°μΈ μ λ΅ νμΈ
# Exchange = μ
ν λ°μ
# BroadcastHashJoin = λΈλ‘λμΊμ€νΈ μ‘°μΈ
# SortMergeJoin = μνΈ λ¨Έμ§ μ‘°μΈ
6.3 λ©νΈλ¦ μμ§¶
# DataFrame ν¬κΈ° μΆμ
def estimate_size(df):
"""DataFrame ν¬κΈ° μΆμ (λ°μ΄νΈ)"""
return df._jdf.queryExecution().optimizedPlan().stats().sizeInBytes()
# νν°μ
λ³ λ μ½λ μ
partition_counts = df.rdd.mapPartitions(
lambda it: [sum(1 for _ in it)]
).collect()
print(f"Min: {min(partition_counts)}, Max: {max(partition_counts)}")
print(f"Skew ratio: {max(partition_counts) / (sum(partition_counts) / len(partition_counts)):.2f}")
7. μΌλ°μ μΈ μ±λ₯ λ¬Έμ μ ν΄κ²°¶
7.1 λ°μ΄ν° μ€ν (Skew)¶
# λ¬Έμ : νΉμ ν€μ λ°μ΄ν° μ§μ€
# μ¦μ: μΌλΆ Taskλ§ μ€λ κ±Έλ¦Ό
# ν΄κ²° 1: AQE μ€ν μ‘°μΈ
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)
# ν΄κ²° 2: μνΈ ν€ μΆκ°
from pyspark.sql.functions import rand, floor
num_salts = 10
df_salted = df.withColumn("salt", floor(rand() * num_salts))
# μνΈ μ‘°μΈ
result = df_salted.join(
other_df.crossJoin(
spark.range(num_salts).withColumnRenamed("id", "salt")
),
["key", "salt"]
).drop("salt")
# ν΄κ²° 3: λΈλ‘λμΊμ€νΈ (κ°λ₯ν κ²½μ°)
result = df.join(broadcast(small_df), "key")
7.2 OOM (Out of Memory)¶
# λ¬Έμ : λ©λͺ¨λ¦¬ λΆμ‘±
# μ¦μ: OutOfMemoryError
# ν΄κ²° 1: Executor λ©λͺ¨λ¦¬ μ¦κ°
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.executor.memoryOverhead", "2g")
# ν΄κ²° 2: νν°μ
μ μ¦κ° (λ°μ΄ν° λΆμ°)
df.repartition(500)
# ν΄κ²° 3: λΆνμν μΊμ ν΄μ
spark.catalog.clearCache()
# ν΄κ²° 4: λΈλ‘λμΊμ€νΈ μκ³κ° κ°μ
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
7.3 μ ν κ³Όλ€¶
# λ¬Έμ : μ
νλ‘ μΈν λ€νΈμν¬/λμ€ν¬ I/O
# μ¦μ: Stage κ° λκΈ° μκ° μ¦κ°
# ν΄κ²° 1: μ
ν μ νν°λ§
df.filter(col("status") == "active").groupBy("key").count()
# ν΄κ²° 2: νν°μ
λ μ λ΅ λ³κ²½
# κ°μ ν€λ‘ νν°μ
λλ λ°μ΄ν°λ μ
ν μμ΄ μ‘°μΈ
df1.repartition(100, "key").join(df2.repartition(100, "key"), "key")
# ν΄κ²° 3: λ²ν·ν
μ¬μ©
df.write.bucketBy(100, "key").saveAsTable("bucketed_table")
μ°μ΅ λ¬Έμ ¶
λ¬Έμ 1: μ€ν κ³ν λΆμ¶
μ£Όμ΄μ§ 쿼리μ μ€ν κ³νμ λΆμνκ³ μ΅μ ν ν¬μΈνΈλ₯Ό μ°ΎμΌμΈμ.
λ¬Έμ 2: μ‘°μΈ μ΅μ ν¶
1μ΅ κ±΄μ νΈλμμ ν μ΄λΈκ³Ό 100λ§ κ±΄μ κ³ κ° ν μ΄λΈμ μ‘°μΈνλ μ΅μ μ λ°©λ²μ μ€κ³νμΈμ.
λ¬Έμ 3: μ€ν μ²λ¦¬¶
νΉμ μΉ΄ν κ³ λ¦¬μ λ°μ΄ν°κ° μ§μ€λ μν©μμ μ§κ³ μ±λ₯μ κ°μ νμΈμ.
μμ½¶
| μ΅μ ν μμ | κΈ°λ² |
|---|---|
| Catalyst | Predicate Pushdown, Column Pruning |
| νν°μ λ | repartition, coalesce, partitionBy |
| μΊμ± | cache, persist, StorageLevel |
| μ‘°μΈ | Broadcast, Sort Merge, λ²ν·ν |
| AQE | μλ νν°μ λ³ν©, μ€ν μ²λ¦¬ |