Apache Spark 기초

Apache Spark 기초

κ°œμš”

Apache SparkλŠ” λŒ€κ·œλͺ¨ 데이터 처리λ₯Ό μœ„ν•œ 톡합 뢄석 μ—”μ§„μž…λ‹ˆλ‹€. 인메λͺ¨λ¦¬ 처리둜 Hadoop MapReduce보닀 λΉ λ₯Έ μ„±λŠ₯을 μ œκ³΅ν•˜λ©°, 배치 μ²˜λ¦¬μ™€ μŠ€νŠΈλ¦¬λ°μ„ λͺ¨λ‘ μ§€μ›ν•©λ‹ˆλ‹€.


1. Spark κ°œμš”

1.1 Spark의 νŠΉμ§•

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Apache Spark νŠΉμ§•                            β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                β”‚
β”‚   1. 속도 (Speed)                                              β”‚
β”‚      - 인메λͺ¨λ¦¬ 처리둜 Hadoop보닀 100λ°° 빠름                     β”‚
β”‚      - λ””μŠ€ν¬ κΈ°λ°˜λ³΄λ‹€ 10λ°° 빠름                                 β”‚
β”‚                                                                β”‚
β”‚   2. μ‚¬μš© νŽΈμ˜μ„± (Ease of Use)                                  β”‚
β”‚      - Python, Scala, Java, R 지원                             β”‚
β”‚      - SQL μΈν„°νŽ˜μ΄μŠ€ 제곡                                      β”‚
β”‚                                                                β”‚
β”‚   3. λ²”μš©μ„± (Generality)                                       β”‚
β”‚      - SQL, 슀트리밍, ML, κ·Έλž˜ν”„ 처리                           β”‚
β”‚      - ν•˜λ‚˜μ˜ μ—”μ§„μœΌλ‘œ λ‹€μ–‘ν•œ μ›Œν¬λ‘œλ“œ                           β”‚
β”‚                                                                β”‚
β”‚   4. ν˜Έν™˜μ„± (Compatibility)                                     β”‚
β”‚      - HDFS, S3, Cassandra λ“± λ‹€μ–‘ν•œ 데이터 μ†ŒμŠ€                β”‚
β”‚      - YARN, Kubernetes, Standalone ν΄λŸ¬μŠ€ν„°                    β”‚
β”‚                                                                β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

1.2 Spark μƒνƒœκ³„

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                     Spark Ecosystem                             β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚   β”‚  Spark SQL β”‚ β”‚ Streaming  β”‚ β”‚   MLlib    β”‚ β”‚  GraphX    β”‚  β”‚
β”‚   β”‚    + DF    β”‚ β”‚ (Structured)β”‚ β”‚(Machine   β”‚ β”‚  (Graph)   β”‚  β”‚
β”‚   β”‚            β”‚ β”‚             β”‚ β”‚ Learning) β”‚ β”‚            β”‚  β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚   ─────────────────────────────────────────────────────────────│
β”‚   β”‚                     Spark Core                           β”‚  β”‚
β”‚   β”‚                 (RDD, Task Scheduling)                   β”‚  β”‚
β”‚   ─────────────────────────────────────────────────────────────│
β”‚   ─────────────────────────────────────────────────────────────│
β”‚   β”‚    Standalone    β”‚    YARN    β”‚    Kubernetes    β”‚ Mesos β”‚  β”‚
β”‚   ─────────────────────────────────────────────────────────────│
β”‚   ─────────────────────────────────────────────────────────────│
β”‚   β”‚  HDFS  β”‚   S3   β”‚   GCS   β”‚  Cassandra  β”‚  JDBC  β”‚ etc β”‚  β”‚
β”‚   ─────────────────────────────────────────────────────────────│
β”‚                                                                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

2. Spark μ•„ν‚€ν…μ²˜

2.1 ν΄λŸ¬μŠ€ν„° ꡬ성

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Spark Cluster Architecture                   β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
β”‚   β”‚                    Driver Program                      β”‚    β”‚
β”‚   β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚    β”‚
β”‚   β”‚   β”‚              SparkContext                        β”‚ β”‚    β”‚
β”‚   β”‚   β”‚   - μ• ν”Œλ¦¬μΌ€μ΄μ…˜ μ§„μž…μ                           β”‚ β”‚    β”‚
β”‚   β”‚   β”‚   - ν΄λŸ¬μŠ€ν„°μ™€ μ—°κ²°                              β”‚ β”‚    β”‚
β”‚   β”‚   β”‚   - Job 생성 및 μŠ€μΌ€μ€„λ§                         β”‚ β”‚    β”‚
β”‚   β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚    β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
β”‚                              ↓                                  β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
β”‚   β”‚                  Cluster Manager                       β”‚    β”‚
β”‚   β”‚       (Standalone, YARN, Kubernetes, Mesos)            β”‚    β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
β”‚                              ↓                                  β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”           β”‚
β”‚   β”‚   Worker    β”‚  β”‚   Worker    β”‚  β”‚   Worker    β”‚           β”‚
β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”  β”‚           β”‚
β”‚   β”‚  β”‚Executorβ”‚ β”‚  β”‚  β”‚Executorβ”‚ β”‚  β”‚  β”‚Executorβ”‚ β”‚           β”‚
β”‚   β”‚  β”‚ Task  β”‚  β”‚  β”‚  β”‚ Task  β”‚  β”‚  β”‚  β”‚ Task  β”‚  β”‚           β”‚
β”‚   β”‚  β”‚ Task  β”‚  β”‚  β”‚  β”‚ Task  β”‚  β”‚  β”‚  β”‚ Task  β”‚  β”‚           β”‚
β”‚   β”‚  β”‚ Cache β”‚  β”‚  β”‚  β”‚ Cache β”‚  β”‚  β”‚  β”‚ Cache β”‚  β”‚           β”‚
β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚           β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜           β”‚
β”‚                                                                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

2.2 핡심 κ°œλ…

κ°œλ… μ„€λͺ…
Driver 메인 ν”„λ‘œκ·Έλž¨ μ‹€ν–‰, SparkContext 생성
Executor Worker λ…Έλ“œμ—μ„œ Task μ‹€ν–‰
Task μ‹€ν–‰μ˜ κΈ°λ³Έ λ‹¨μœ„
Job Action에 μ˜ν•΄ μƒμ„±λ˜λŠ” 병렬 계산
Stage Job λ‚΄μ˜ Task κ·Έλ£Ή (Shuffle 경계)
Partition λ°μ΄ν„°μ˜ 논리적 λΆ„ν•  λ‹¨μœ„

2.3 μ‹€ν–‰ 흐름

"""
Spark μ‹€ν–‰ 흐름:
1. Driverμ—μ„œ SparkContext 생성
2. μ• ν”Œλ¦¬μΌ€μ΄μ…˜ μ½”λ“œ 해석
3. Transformation β†’ DAG (Directed Acyclic Graph) 생성
4. Action 호좜 μ‹œ Job 생성
5. Job β†’ Stages β†’ Tasks둜 λΆ„ν•΄
6. Cluster Managerκ°€ Executor에 Task ν• λ‹Ή
7. Executorμ—μ„œ Task μ‹€ν–‰
8. κ²°κ³Όλ₯Ό Driver둜 λ°˜ν™˜
"""

# μ˜ˆμ‹œ μ½”λ“œ 흐름
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Example").getOrCreate()

# Transformations (Lazy - μ‹€ν–‰ μ•ˆ 됨)
df = spark.read.csv("data.csv", header=True)  # 읽기 κ³„νš
df2 = df.filter(df.age > 20)                  # ν•„ν„° κ³„νš
df3 = df2.groupBy("city").count()             # 집계 κ³„νš

# Action (μ‹€μ œ μ‹€ν–‰ 트리거)
result = df3.collect()  # Job 생성 β†’ Stages β†’ Tasks β†’ μ‹€ν–‰

3. RDD (Resilient Distributed Dataset)

3.1 RDD κ°œλ…

RDDλŠ” Spark의 κΈ°λ³Έ 데이터 ꡬ쑰둜, λΆ„μ‚°λœ λΆˆλ³€ 데이터 μ»¬λ ‰μ…˜μž…λ‹ˆλ‹€.

from pyspark import SparkContext

sc = SparkContext("local[*]", "RDD Example")

# RDD 생성 방법
# 1. μ»¬λ ‰μ…˜μ—μ„œ 생성
rdd1 = sc.parallelize([1, 2, 3, 4, 5])

# 2. μ™ΈλΆ€ λ°μ΄ν„°μ—μ„œ 생성
rdd2 = sc.textFile("data.txt")

# 3. κΈ°μ‘΄ RDD λ³€ν™˜
rdd3 = rdd1.map(lambda x: x * 2)

# RDD νŠΉμ„±
"""
R - Resilient: μž₯μ•  볡ꡬ κ°€λŠ₯ (Lineage둜 μž¬κ³„μ‚°)
D - Distributed: ν΄λŸ¬μŠ€ν„°μ— λΆ„μ‚° μ €μž₯
D - Dataset: 데이터 μ»¬λ ‰μ…˜
"""

3.2 RDD μ—°μ‚°

# Transformations (Lazy)
# - μƒˆλ‘œμš΄ RDD λ°˜ν™˜
# - μ‹€ν–‰ κ³„νšλ§Œ 생성

rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

# map: 각 μš”μ†Œμ— ν•¨μˆ˜ 적용
mapped = rdd.map(lambda x: x * 2)  # [2, 4, 6, ...]

# filter: 쑰건에 λ§žλŠ” μš”μ†Œλ§Œ 선택
filtered = rdd.filter(lambda x: x % 2 == 0)  # [2, 4, 6, 8, 10]

# flatMap: map ν›„ flatten
flat = rdd.flatMap(lambda x: [x, x*2])  # [1, 2, 2, 4, 3, 6, ...]

# distinct: 쀑볡 제거
distinct = rdd.distinct()

# union: 두 RDD ν•©μΉ˜κΈ°
union = rdd.union(sc.parallelize([11, 12]))

# groupByKey: 킀별 κ·Έλ£Ήν™”
pairs = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])
grouped = pairs.groupByKey()  # [("a", [1, 3]), ("b", [2])]

# reduceByKey: 킀별 λ¦¬λ“€μŠ€
reduced = pairs.reduceByKey(lambda a, b: a + b)  # [("a", 4), ("b", 2)]


# Actions (Eager)
# - κ²°κ³Ό λ°˜ν™˜ λ˜λŠ” μ €μž₯
# - μ‹€μ œ μ‹€ν–‰ 트리거

# collect: λͺ¨λ“  μš”μ†Œλ₯Ό Driver둜 λ°˜ν™˜
result = rdd.collect()  # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# count: μš”μ†Œ 개수
count = rdd.count()  # 10

# first / take: 첫 번째 / n개 μš”μ†Œ
first = rdd.first()  # 1
take3 = rdd.take(3)  # [1, 2, 3]

# reduce: 전체 λ¦¬λ“€μŠ€
total = rdd.reduce(lambda a, b: a + b)  # 55

# foreach: 각 μš”μ†Œμ— ν•¨μˆ˜ 적용 (λΆ€μˆ˜ 효과)
rdd.foreach(lambda x: print(x))

# saveAsTextFile: 파일둜 μ €μž₯
rdd.saveAsTextFile("output/")

3.3 Pair RDD μ—°μ‚°

# Key-Value 쌍 RDD μ—°μ‚°
sales = sc.parallelize([
    ("Electronics", 100),
    ("Clothing", 50),
    ("Electronics", 200),
    ("Clothing", 75),
    ("Food", 30),
])

# 킀별 합계
total_by_category = sales.reduceByKey(lambda a, b: a + b)
# [("Electronics", 300), ("Clothing", 125), ("Food", 30)]

# 킀별 평균
count_sum = sales.combineByKey(
    lambda v: (v, 1),                      # createCombiner
    lambda acc, v: (acc[0] + v, acc[1] + 1),  # mergeValue
    lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])  # mergeCombiner
)
avg_by_category = count_sum.mapValues(lambda x: x[0] / x[1])

# μ •λ ¬
sorted_rdd = sales.sortByKey()

# Join
inventory = sc.parallelize([
    ("Electronics", 50),
    ("Clothing", 100),
])

joined = sales.join(inventory)
# [("Electronics", (100, 50)), ("Electronics", (200, 50)), ...]

4. μ„€μΉ˜ 및 μ‹€ν–‰

4.1 둜컬 μ„€μΉ˜ (PySpark)

# pip μ„€μΉ˜
pip install pyspark

# 버전 확인
pyspark --version

# PySpark μ…Έ μ‹œμž‘
pyspark

# spark-submit으둜 슀크립트 μ‹€ν–‰
spark-submit my_script.py

4.2 Docker μ„€μΉ˜

# docker-compose.yaml
version: '3'

services:
  spark-master:
    image: bitnami/spark:3.4
    environment:
      - SPARK_MODE=master
    ports:
      - "8080:8080"
      - "7077:7077"

  spark-worker:
    image: bitnami/spark:3.4
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark-master:7077
    depends_on:
      - spark-master
# μ‹€ν–‰
docker-compose up -d

# ν΄λŸ¬μŠ€ν„°μ— μž‘μ—… 제좜
spark-submit --master spark://localhost:7077 my_script.py

4.3 ν΄λŸ¬μŠ€ν„° λͺ¨λ“œ

# Standalone ν΄λŸ¬μŠ€ν„°
spark-submit \
    --master spark://master:7077 \
    --deploy-mode cluster \
    --executor-memory 4G \
    --executor-cores 2 \
    --num-executors 10 \
    my_script.py

# YARN ν΄λŸ¬μŠ€ν„°
spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --executor-memory 4G \
    my_script.py

# Kubernetes ν΄λŸ¬μŠ€ν„°
spark-submit \
    --master k8s://https://k8s-master:6443 \
    --deploy-mode cluster \
    --conf spark.kubernetes.container.image=my-spark-image \
    my_script.py

5. SparkSession

5.1 SparkSession 생성

from pyspark.sql import SparkSession

# κΈ°λ³Έ SparkSession
spark = SparkSession.builder \
    .appName("My Application") \
    .getOrCreate()

# μ„€μ • 포함
spark = SparkSession.builder \
    .appName("My Application") \
    .master("local[*]") \
    .config("spark.sql.shuffle.partitions", 200) \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .enableHiveSupport() \
    .getOrCreate()

# SparkContext μ ‘κ·Ό
sc = spark.sparkContext

# μ„€μ • 확인
print(spark.conf.get("spark.sql.shuffle.partitions"))

# μ„Έμ…˜ μ’…λ£Œ
spark.stop()

5.2 μ£Όμš” μ„€μ •

# 자주 μ‚¬μš©ν•˜λŠ” μ„€μ •
common_configs = {
    # λ©”λͺ¨λ¦¬ μ„€μ •
    "spark.executor.memory": "4g",
    "spark.driver.memory": "2g",
    "spark.executor.memoryOverhead": "512m",

    # 병렬성 μ„€μ •
    "spark.executor.cores": "4",
    "spark.default.parallelism": "100",
    "spark.sql.shuffle.partitions": "200",

    # 직렬화 μ„€μ •
    "spark.serializer": "org.apache.spark.serializer.KryoSerializer",

    # Adaptive Query Execution (Spark 3.0+)
    "spark.sql.adaptive.enabled": "true",
    "spark.sql.adaptive.coalescePartitions.enabled": "true",
    "spark.sql.adaptive.skewJoin.enabled": "true",

    # μΊμ‹œ μ„€μ •
    "spark.storage.memoryFraction": "0.6",

    # μ…”ν”Œ μ„€μ •
    "spark.shuffle.compress": "true",
}

# μ„€μ • 적용 μ˜ˆμ‹œ
spark = SparkSession.builder \
    .config("spark.sql.shuffle.partitions", 100) \
    .config("spark.sql.adaptive.enabled", True) \
    .getOrCreate()

6. 기본 예제

6.1 Word Count

from pyspark.sql import SparkSession

# SparkSession 생성
spark = SparkSession.builder \
    .appName("Word Count") \
    .getOrCreate()

sc = spark.sparkContext

# ν…μŠ€νŠΈ 파일 읽기
text_rdd = sc.textFile("input.txt")

# Word Count 둜직
word_counts = text_rdd \
    .flatMap(lambda line: line.split()) \
    .map(lambda word: (word.lower(), 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: x[1], ascending=False)

# κ²°κ³Ό 좜λ ₯
for word, count in word_counts.take(10):
    print(f"{word}: {count}")

# 파일둜 μ €μž₯
word_counts.saveAsTextFile("output/word_counts")

spark.stop()

6.2 DataFrame κΈ°λ³Έ

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, avg

spark = SparkSession.builder.appName("DataFrame Example").getOrCreate()

# DataFrame 생성
data = [
    ("Alice", "Engineering", 50000),
    ("Bob", "Engineering", 60000),
    ("Charlie", "Marketing", 45000),
    ("Diana", "Marketing", 55000),
]

df = spark.createDataFrame(data, ["name", "department", "salary"])

# κΈ°λ³Έ μ—°μ‚°
df.show()
df.printSchema()

# 필터링
df.filter(col("salary") > 50000).show()

# 집계
df.groupBy("department") \
    .agg(
        _sum("salary").alias("total_salary"),
        avg("salary").alias("avg_salary")
    ) \
    .show()

# SQL μ‚¬μš©
df.createOrReplaceTempView("employees")
spark.sql("""
    SELECT department, AVG(salary) as avg_salary
    FROM employees
    GROUP BY department
""").show()

spark.stop()

μ—°μŠ΅ 문제

문제 1: RDD κΈ°λ³Έ μ—°μ‚°

1λΆ€ν„° 100κΉŒμ§€μ˜ 숫자 쀑 짝수만 μ„ νƒν•˜μ—¬ 제곱의 합을 κ΅¬ν•˜μ„Έμš”.

# 풀이
sc = spark.sparkContext
result = sc.parallelize(range(1, 101)) \
    .filter(lambda x: x % 2 == 0) \
    .map(lambda x: x ** 2) \
    .reduce(lambda a, b: a + b)
print(result)  # 171700

문제 2: Pair RDD

둜그 νŒŒμΌμ—μ„œ μ—λŸ¬ μˆ˜μ€€λ³„ 둜그 수λ₯Ό μ§‘κ³„ν•˜μ„Έμš”.

# μž…λ ₯: "2024-01-01 ERROR: Connection failed"
logs = sc.textFile("logs.txt")
error_counts = logs \
    .map(lambda line: line.split()[1].replace(":", "")) \
    .map(lambda level: (level, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .collect()

μš”μ•½

κ°œλ… μ„€λͺ…
Spark λŒ€κ·œλͺ¨ 데이터 처리 톡합 μ—”μ§„
RDD κΈ°λ³Έ λΆ„μ‚° 데이터 ꡬ쑰
Transformation μƒˆ RDD 생성 (Lazy)
Action κ²°κ³Ό λ°˜ν™˜ (Eager)
Driver 메인 ν”„λ‘œκ·Έλž¨ μ‹€ν–‰ λ…Έλ“œ
Executor Task μ‹€ν–‰ μ›Œμ»€

참고 자료

to navigate between lessons