Apache Spark κΈ°μ΄
Apache Spark κΈ°μ΄¶
κ°μ¶
Apache Sparkλ λκ·λͺ¨ λ°μ΄ν° μ²λ¦¬λ₯Ό μν ν΅ν© λΆμ μμ§μ λλ€. μΈλ©λͺ¨λ¦¬ μ²λ¦¬λ‘ Hadoop MapReduceλ³΄λ€ λΉ λ₯Έ μ±λ₯μ μ 곡νλ©°, λ°°μΉ μ²λ¦¬μ μ€νΈλ¦¬λ°μ λͺ¨λ μ§μν©λλ€.
1. Spark κ°μ¶
1.1 Sparkμ νΉμ§¶
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Apache Spark νΉμ§ β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β 1. μλ (Speed) β
β - μΈλ©λͺ¨λ¦¬ μ²λ¦¬λ‘ Hadoopλ³΄λ€ 100λ°° λΉ λ¦ β
β - λμ€ν¬ κΈ°λ°λ³΄λ€ 10λ°° λΉ λ¦ β
β β
β 2. μ¬μ© νΈμμ± (Ease of Use) β
β - Python, Scala, Java, R μ§μ β
β - SQL μΈν°νμ΄μ€ μ 곡 β
β β
β 3. λ²μ©μ± (Generality) β
β - SQL, μ€νΈλ¦¬λ°, ML, κ·Έλν μ²λ¦¬ β
β - νλμ μμ§μΌλ‘ λ€μν μν¬λ‘λ β
β β
β 4. νΈνμ± (Compatibility) β
β - HDFS, S3, Cassandra λ± λ€μν λ°μ΄ν° μμ€ β
β - YARN, Kubernetes, Standalone ν΄λ¬μ€ν° β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
1.2 Spark μνκ³¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Spark Ecosystem β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββ ββββββββββββββ ββββββββββββββ ββββββββββββββ β
β β Spark SQL β β Streaming β β MLlib β β GraphX β β
β β + DF β β (Structured)β β(Machine β β (Graph) β β
β β β β β β Learning) β β β β
β ββββββββββββββ ββββββββββββββ ββββββββββββββ ββββββββββββββ β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β Spark Core β β
β β (RDD, Task Scheduling) β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β Standalone β YARN β Kubernetes β Mesos β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β HDFS β S3 β GCS β Cassandra β JDBC β etc β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
2. Spark μν€ν μ²¶
2.1 ν΄λ¬μ€ν° ꡬ챶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Spark Cluster Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Driver Program β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β SparkContext β β β
β β β - μ ν리μΌμ΄μ
μ§μ
μ β β β
β β β - ν΄λ¬μ€ν°μ μ°κ²° β β β
β β β - Job μμ± λ° μ€μΌμ€λ§ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Cluster Manager β β
β β (Standalone, YARN, Kubernetes, Mesos) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Worker β β Worker β β Worker β β
β β βββββββββ β β βββββββββ β β βββββββββ β β
β β βExecutorβ β β βExecutorβ β β βExecutorβ β β
β β β Task β β β β Task β β β β Task β β β
β β β Task β β β β Task β β β β Task β β β
β β β Cache β β β β Cache β β β β Cache β β β
β β βββββββββ β β βββββββββ β β βββββββββ β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
2.2 ν΅μ¬ κ°λ ¶
| κ°λ | μ€λͺ |
|---|---|
| Driver | λ©μΈ νλ‘κ·Έλ¨ μ€ν, SparkContext μμ± |
| Executor | Worker λ Έλμμ Task μ€ν |
| Task | μ€νμ κΈ°λ³Έ λ¨μ |
| Job | Actionμ μν΄ μμ±λλ λ³λ ¬ κ³μ° |
| Stage | Job λ΄μ Task κ·Έλ£Ή (Shuffle κ²½κ³) |
| Partition | λ°μ΄ν°μ λ Όλ¦¬μ λΆν λ¨μ |
2.3 μ€ν ν릶
"""
Spark μ€ν νλ¦:
1. Driverμμ SparkContext μμ±
2. μ ν리μΌμ΄μ
μ½λ ν΄μ
3. Transformation β DAG (Directed Acyclic Graph) μμ±
4. Action νΈμΆ μ Job μμ±
5. Job β Stages β Tasksλ‘ λΆν΄
6. Cluster Managerκ° Executorμ Task ν λΉ
7. Executorμμ Task μ€ν
8. κ²°κ³Όλ₯Ό Driverλ‘ λ°ν
"""
# μμ μ½λ νλ¦
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Example").getOrCreate()
# Transformations (Lazy - μ€ν μ λ¨)
df = spark.read.csv("data.csv", header=True) # μ½κΈ° κ³ν
df2 = df.filter(df.age > 20) # νν° κ³ν
df3 = df2.groupBy("city").count() # μ§κ³ κ³ν
# Action (μ€μ μ€ν νΈλ¦¬κ±°)
result = df3.collect() # Job μμ± β Stages β Tasks β μ€ν
3. RDD (Resilient Distributed Dataset)¶
3.1 RDD κ°λ ¶
RDDλ Sparkμ κΈ°λ³Έ λ°μ΄ν° ꡬ쑰λ‘, λΆμ°λ λΆλ³ λ°μ΄ν° 컬λ μ μ λλ€.
from pyspark import SparkContext
sc = SparkContext("local[*]", "RDD Example")
# RDD μμ± λ°©λ²
# 1. 컬λ μ
μμ μμ±
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
# 2. μΈλΆ λ°μ΄ν°μμ μμ±
rdd2 = sc.textFile("data.txt")
# 3. κΈ°μ‘΄ RDD λ³ν
rdd3 = rdd1.map(lambda x: x * 2)
# RDD νΉμ±
"""
R - Resilient: μ₯μ 볡ꡬ κ°λ₯ (Lineageλ‘ μ¬κ³μ°)
D - Distributed: ν΄λ¬μ€ν°μ λΆμ° μ μ₯
D - Dataset: λ°μ΄ν° 컬λ μ
"""
3.2 RDD μ°μ°¶
# Transformations (Lazy)
# - μλ‘μ΄ RDD λ°ν
# - μ€ν κ³νλ§ μμ±
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
# map: κ° μμμ ν¨μ μ μ©
mapped = rdd.map(lambda x: x * 2) # [2, 4, 6, ...]
# filter: 쑰건μ λ§λ μμλ§ μ ν
filtered = rdd.filter(lambda x: x % 2 == 0) # [2, 4, 6, 8, 10]
# flatMap: map ν flatten
flat = rdd.flatMap(lambda x: [x, x*2]) # [1, 2, 2, 4, 3, 6, ...]
# distinct: μ€λ³΅ μ κ±°
distinct = rdd.distinct()
# union: λ RDD ν©μΉκΈ°
union = rdd.union(sc.parallelize([11, 12]))
# groupByKey: ν€λ³ κ·Έλ£Ήν
pairs = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])
grouped = pairs.groupByKey() # [("a", [1, 3]), ("b", [2])]
# reduceByKey: ν€λ³ 리λμ€
reduced = pairs.reduceByKey(lambda a, b: a + b) # [("a", 4), ("b", 2)]
# Actions (Eager)
# - κ²°κ³Ό λ°ν λλ μ μ₯
# - μ€μ μ€ν νΈλ¦¬κ±°
# collect: λͺ¨λ μμλ₯Ό Driverλ‘ λ°ν
result = rdd.collect() # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# count: μμ κ°μ
count = rdd.count() # 10
# first / take: 첫 λ²μ§Έ / nκ° μμ
first = rdd.first() # 1
take3 = rdd.take(3) # [1, 2, 3]
# reduce: μ 체 리λμ€
total = rdd.reduce(lambda a, b: a + b) # 55
# foreach: κ° μμμ ν¨μ μ μ© (λΆμ ν¨κ³Ό)
rdd.foreach(lambda x: print(x))
# saveAsTextFile: νμΌλ‘ μ μ₯
rdd.saveAsTextFile("output/")
3.3 Pair RDD μ°μ°¶
# Key-Value μ RDD μ°μ°
sales = sc.parallelize([
("Electronics", 100),
("Clothing", 50),
("Electronics", 200),
("Clothing", 75),
("Food", 30),
])
# ν€λ³ ν©κ³
total_by_category = sales.reduceByKey(lambda a, b: a + b)
# [("Electronics", 300), ("Clothing", 125), ("Food", 30)]
# ν€λ³ νκ·
count_sum = sales.combineByKey(
lambda v: (v, 1), # createCombiner
lambda acc, v: (acc[0] + v, acc[1] + 1), # mergeValue
lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]) # mergeCombiner
)
avg_by_category = count_sum.mapValues(lambda x: x[0] / x[1])
# μ λ ¬
sorted_rdd = sales.sortByKey()
# Join
inventory = sc.parallelize([
("Electronics", 50),
("Clothing", 100),
])
joined = sales.join(inventory)
# [("Electronics", (100, 50)), ("Electronics", (200, 50)), ...]
4. μ€μΉ λ° μ€ν¶
4.1 λ‘컬 μ€μΉ (PySpark)¶
# pip μ€μΉ
pip install pyspark
# λ²μ νμΈ
pyspark --version
# PySpark μ
Έ μμ
pyspark
# spark-submitμΌλ‘ μ€ν¬λ¦½νΈ μ€ν
spark-submit my_script.py
4.2 Docker μ€μΉ¶
# docker-compose.yaml
version: '3'
services:
spark-master:
image: bitnami/spark:3.4
environment:
- SPARK_MODE=master
ports:
- "8080:8080"
- "7077:7077"
spark-worker:
image: bitnami/spark:3.4
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
depends_on:
- spark-master
# μ€ν
docker-compose up -d
# ν΄λ¬μ€ν°μ μμ
μ μΆ
spark-submit --master spark://localhost:7077 my_script.py
4.3 ν΄λ¬μ€ν° λͺ¨λ¶
# Standalone ν΄λ¬μ€ν°
spark-submit \
--master spark://master:7077 \
--deploy-mode cluster \
--executor-memory 4G \
--executor-cores 2 \
--num-executors 10 \
my_script.py
# YARN ν΄λ¬μ€ν°
spark-submit \
--master yarn \
--deploy-mode cluster \
--executor-memory 4G \
my_script.py
# Kubernetes ν΄λ¬μ€ν°
spark-submit \
--master k8s://https://k8s-master:6443 \
--deploy-mode cluster \
--conf spark.kubernetes.container.image=my-spark-image \
my_script.py
5. SparkSession¶
5.1 SparkSession μμ±¶
from pyspark.sql import SparkSession
# κΈ°λ³Έ SparkSession
spark = SparkSession.builder \
.appName("My Application") \
.getOrCreate()
# μ€μ ν¬ν¨
spark = SparkSession.builder \
.appName("My Application") \
.master("local[*]") \
.config("spark.sql.shuffle.partitions", 200) \
.config("spark.executor.memory", "4g") \
.config("spark.driver.memory", "2g") \
.config("spark.sql.adaptive.enabled", "true") \
.enableHiveSupport() \
.getOrCreate()
# SparkContext μ κ·Ό
sc = spark.sparkContext
# μ€μ νμΈ
print(spark.conf.get("spark.sql.shuffle.partitions"))
# μΈμ
μ’
λ£
spark.stop()
5.2 μ£Όμ μ€μ ¶
# μμ£Ό μ¬μ©νλ μ€μ
common_configs = {
# λ©λͺ¨λ¦¬ μ€μ
"spark.executor.memory": "4g",
"spark.driver.memory": "2g",
"spark.executor.memoryOverhead": "512m",
# λ³λ ¬μ± μ€μ
"spark.executor.cores": "4",
"spark.default.parallelism": "100",
"spark.sql.shuffle.partitions": "200",
# μ§λ ¬ν μ€μ
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
# Adaptive Query Execution (Spark 3.0+)
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.sql.adaptive.skewJoin.enabled": "true",
# μΊμ μ€μ
"spark.storage.memoryFraction": "0.6",
# μ
ν μ€μ
"spark.shuffle.compress": "true",
}
# μ€μ μ μ© μμ
spark = SparkSession.builder \
.config("spark.sql.shuffle.partitions", 100) \
.config("spark.sql.adaptive.enabled", True) \
.getOrCreate()
6. κΈ°λ³Έ μμ ¶
6.1 Word Count¶
from pyspark.sql import SparkSession
# SparkSession μμ±
spark = SparkSession.builder \
.appName("Word Count") \
.getOrCreate()
sc = spark.sparkContext
# ν
μ€νΈ νμΌ μ½κΈ°
text_rdd = sc.textFile("input.txt")
# Word Count λ‘μ§
word_counts = text_rdd \
.flatMap(lambda line: line.split()) \
.map(lambda word: (word.lower(), 1)) \
.reduceByKey(lambda a, b: a + b) \
.sortBy(lambda x: x[1], ascending=False)
# κ²°κ³Ό μΆλ ₯
for word, count in word_counts.take(10):
print(f"{word}: {count}")
# νμΌλ‘ μ μ₯
word_counts.saveAsTextFile("output/word_counts")
spark.stop()
6.2 DataFrame κΈ°λ³Έ¶
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, avg
spark = SparkSession.builder.appName("DataFrame Example").getOrCreate()
# DataFrame μμ±
data = [
("Alice", "Engineering", 50000),
("Bob", "Engineering", 60000),
("Charlie", "Marketing", 45000),
("Diana", "Marketing", 55000),
]
df = spark.createDataFrame(data, ["name", "department", "salary"])
# κΈ°λ³Έ μ°μ°
df.show()
df.printSchema()
# νν°λ§
df.filter(col("salary") > 50000).show()
# μ§κ³
df.groupBy("department") \
.agg(
_sum("salary").alias("total_salary"),
avg("salary").alias("avg_salary")
) \
.show()
# SQL μ¬μ©
df.createOrReplaceTempView("employees")
spark.sql("""
SELECT department, AVG(salary) as avg_salary
FROM employees
GROUP BY department
""").show()
spark.stop()
μ°μ΅ λ¬Έμ ¶
λ¬Έμ 1: RDD κΈ°λ³Έ μ°μ°¶
1λΆν° 100κΉμ§μ μ«μ μ€ μ§μλ§ μ ννμ¬ μ κ³±μ ν©μ ꡬνμΈμ.
# νμ΄
sc = spark.sparkContext
result = sc.parallelize(range(1, 101)) \
.filter(lambda x: x % 2 == 0) \
.map(lambda x: x ** 2) \
.reduce(lambda a, b: a + b)
print(result) # 171700
λ¬Έμ 2: Pair RDD¶
λ‘κ·Έ νμΌμμ μλ¬ μμ€λ³ λ‘κ·Έ μλ₯Ό μ§κ³νμΈμ.
# μ
λ ₯: "2024-01-01 ERROR: Connection failed"
logs = sc.textFile("logs.txt")
error_counts = logs \
.map(lambda line: line.split()[1].replace(":", "")) \
.map(lambda level: (level, 1)) \
.reduceByKey(lambda a, b: a + b) \
.collect()
μμ½¶
| κ°λ | μ€λͺ |
|---|---|
| Spark | λκ·λͺ¨ λ°μ΄ν° μ²λ¦¬ ν΅ν© μμ§ |
| RDD | κΈ°λ³Έ λΆμ° λ°μ΄ν° ꡬ쑰 |
| Transformation | μ RDD μμ± (Lazy) |
| Action | κ²°κ³Ό λ°ν (Eager) |
| Driver | λ©μΈ νλ‘κ·Έλ¨ μ€ν λ Έλ |
| Executor | Task μ€ν μ컀 |