Apache Spark Basics
Apache Spark Basics¶
Overview¶
Apache Spark is a unified analytics engine for large-scale data processing. It provides faster performance than Hadoop MapReduce through in-memory processing and supports both batch processing and streaming.
1. Spark Overview¶
1.1 Spark Features¶
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Apache Spark Features β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β 1. Speed β
β - 100x faster than Hadoop with in-memory processing β
β - 10x faster than disk-based processing β
β β
β 2. Ease of Use β
β - Supports Python, Scala, Java, R β
β - Provides SQL interface β
β β
β 3. Generality β
β - SQL, streaming, ML, graph processing β
β - Diverse workloads with one engine β
β β
β 4. Compatibility β
β - Various data sources: HDFS, S3, Cassandra, etc. β
β - YARN, Kubernetes, Standalone clusters β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
1.2 Spark Ecosystem¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Spark Ecosystem β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββ ββββββββββββββ ββββββββββββββ ββββββββββββββ β
β β Spark SQL β β Streaming β β MLlib β β GraphX β β
β β + DF β β (Structured)β β(Machine β β (Graph) β β
β β β β β β Learning) β β β β
β ββββββββββββββ ββββββββββββββ ββββββββββββββ ββββββββββββββ β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β Spark Core β β
β β (RDD, Task Scheduling) β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β Standalone β YARN β Kubernetes β Mesos β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β HDFS β S3 β GCS β Cassandra β JDBC β etc β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
2. Spark Architecture¶
2.1 Cluster Configuration¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Spark Cluster Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Driver Program β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β SparkContext β β β
β β β - Application entry point β β β
β β β - Connects to cluster β β β
β β β - Job creation and scheduling β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Cluster Manager β β
β β (Standalone, YARN, Kubernetes, Mesos) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Worker β β Worker β β Worker β β
β β βββββββββ β β βββββββββ β β βββββββββ β β
β β βExecutorβ β β βExecutorβ β β βExecutorβ β β
β β β Task β β β β Task β β β β Task β β β
β β β Task β β β β Task β β β β Task β β β
β β β Cache β β β β Cache β β β β Cache β β β
β β βββββββββ β β βββββββββ β β βββββββββ β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
2.2 Core Concepts¶
| Concept | Description |
|---|---|
| Driver | Executes main program, creates SparkContext |
| Executor | Executes tasks on worker nodes |
| Task | Basic unit of execution |
| Job | Parallel computation triggered by an action |
| Stage | Group of tasks within a job (shuffle boundary) |
| Partition | Logical division unit of data |
2.3 Execution Flow¶
"""
Spark execution flow:
1. Create SparkContext in Driver
2. Parse application code
3. Transformations β Create DAG (Directed Acyclic Graph)
4. Create job when action is called
5. Decompose job β Stages β Tasks
6. Cluster Manager assigns tasks to Executors
7. Executors execute tasks
8. Return results to Driver
"""
# Example code flow
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Example").getOrCreate()
# Transformations (Lazy - not executed)
df = spark.read.csv("data.csv", header=True) # Read plan
df2 = df.filter(df.age > 20) # Filter plan
df3 = df2.groupBy("city").count() # Aggregation plan
# Action (triggers actual execution)
result = df3.collect() # Create job β Stages β Tasks β Execute
3. RDD (Resilient Distributed Dataset)¶
3.1 RDD Concept¶
RDD is Spark's fundamental data structure, an immutable distributed collection of data.
from pyspark import SparkContext
sc = SparkContext("local[*]", "RDD Example")
# Ways to create RDD
# 1. From collection
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
# 2. From external data
rdd2 = sc.textFile("data.txt")
# 3. From existing RDD transformation
rdd3 = rdd1.map(lambda x: x * 2)
# RDD properties
"""
R - Resilient: Fault-recoverable (recompute via lineage)
D - Distributed: Distributed across cluster
D - Dataset: Data collection
"""
3.2 RDD Operations¶
# Transformations (Lazy)
# - Return new RDD
# - Only create execution plan
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
# map: Apply function to each element
mapped = rdd.map(lambda x: x * 2) # [2, 4, 6, ...]
# filter: Select elements matching condition
filtered = rdd.filter(lambda x: x % 2 == 0) # [2, 4, 6, 8, 10]
# flatMap: map then flatten
flat = rdd.flatMap(lambda x: [x, x*2]) # [1, 2, 2, 4, 3, 6, ...]
# distinct: Remove duplicates
distinct = rdd.distinct()
# union: Merge two RDDs
union = rdd.union(sc.parallelize([11, 12]))
# groupByKey: Group by key
pairs = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])
grouped = pairs.groupByKey() # [("a", [1, 3]), ("b", [2])]
# reduceByKey: Reduce by key
reduced = pairs.reduceByKey(lambda a, b: a + b) # [("a", 4), ("b", 2)]
# Actions (Eager)
# - Return results or save
# - Trigger actual execution
# collect: Return all elements to Driver
result = rdd.collect() # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# count: Count elements
count = rdd.count() # 10
# first / take: First element / n elements
first = rdd.first() # 1
take3 = rdd.take(3) # [1, 2, 3]
# reduce: Reduce all
total = rdd.reduce(lambda a, b: a + b) # 55
# foreach: Apply function to each element (side effect)
rdd.foreach(lambda x: print(x))
# saveAsTextFile: Save to file
rdd.saveAsTextFile("output/")
3.3 Pair RDD Operations¶
# Key-Value pair RDD operations
sales = sc.parallelize([
("Electronics", 100),
("Clothing", 50),
("Electronics", 200),
("Clothing", 75),
("Food", 30),
])
# Sum by key
total_by_category = sales.reduceByKey(lambda a, b: a + b)
# [("Electronics", 300), ("Clothing", 125), ("Food", 30)]
# Average by key
count_sum = sales.combineByKey(
lambda v: (v, 1), # createCombiner
lambda acc, v: (acc[0] + v, acc[1] + 1), # mergeValue
lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]) # mergeCombiner
)
avg_by_category = count_sum.mapValues(lambda x: x[0] / x[1])
# Sort
sorted_rdd = sales.sortByKey()
# Join
inventory = sc.parallelize([
("Electronics", 50),
("Clothing", 100),
])
joined = sales.join(inventory)
# [("Electronics", (100, 50)), ("Electronics", (200, 50)), ...]
4. Installation and Execution¶
4.1 Local Installation (PySpark)¶
# pip installation
pip install pyspark
# Check version
pyspark --version
# Start PySpark shell
pyspark
# Execute script with spark-submit
spark-submit my_script.py
4.2 Docker Installation¶
# docker-compose.yaml
version: '3'
services:
spark-master:
image: bitnami/spark:3.4
environment:
- SPARK_MODE=master
ports:
- "8080:8080"
- "7077:7077"
spark-worker:
image: bitnami/spark:3.4
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
depends_on:
- spark-master
# Run
docker-compose up -d
# Submit job to cluster
spark-submit --master spark://localhost:7077 my_script.py
4.3 Cluster Mode¶
# Standalone cluster
spark-submit \
--master spark://master:7077 \
--deploy-mode cluster \
--executor-memory 4G \
--executor-cores 2 \
--num-executors 10 \
my_script.py
# YARN cluster
spark-submit \
--master yarn \
--deploy-mode cluster \
--executor-memory 4G \
my_script.py
# Kubernetes cluster
spark-submit \
--master k8s://https://k8s-master:6443 \
--deploy-mode cluster \
--conf spark.kubernetes.container.image=my-spark-image \
my_script.py
5. SparkSession¶
5.1 Creating SparkSession¶
from pyspark.sql import SparkSession
# Basic SparkSession
spark = SparkSession.builder \
.appName("My Application") \
.getOrCreate()
# With configuration
spark = SparkSession.builder \
.appName("My Application") \
.master("local[*]") \
.config("spark.sql.shuffle.partitions", 200) \
.config("spark.executor.memory", "4g") \
.config("spark.driver.memory", "2g") \
.config("spark.sql.adaptive.enabled", "true") \
.enableHiveSupport() \
.getOrCreate()
# Access SparkContext
sc = spark.sparkContext
# Check configuration
print(spark.conf.get("spark.sql.shuffle.partitions"))
# Stop session
spark.stop()
5.2 Common Configurations¶
# Frequently used configurations
common_configs = {
# Memory settings
"spark.executor.memory": "4g",
"spark.driver.memory": "2g",
"spark.executor.memoryOverhead": "512m",
# Parallelism settings
"spark.executor.cores": "4",
"spark.default.parallelism": "100",
"spark.sql.shuffle.partitions": "200",
# Serialization settings
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
# Adaptive Query Execution (Spark 3.0+)
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.sql.adaptive.skewJoin.enabled": "true",
# Cache settings
"spark.storage.memoryFraction": "0.6",
# Shuffle settings
"spark.shuffle.compress": "true",
}
# Apply configuration example
spark = SparkSession.builder \
.config("spark.sql.shuffle.partitions", 100) \
.config("spark.sql.adaptive.enabled", True) \
.getOrCreate()
6. Basic Examples¶
6.1 Word Count¶
from pyspark.sql import SparkSession
# Create SparkSession
spark = SparkSession.builder \
.appName("Word Count") \
.getOrCreate()
sc = spark.sparkContext
# Read text file
text_rdd = sc.textFile("input.txt")
# Word count logic
word_counts = text_rdd \
.flatMap(lambda line: line.split()) \
.map(lambda word: (word.lower(), 1)) \
.reduceByKey(lambda a, b: a + b) \
.sortBy(lambda x: x[1], ascending=False)
# Print results
for word, count in word_counts.take(10):
print(f"{word}: {count}")
# Save to file
word_counts.saveAsTextFile("output/word_counts")
spark.stop()
6.2 DataFrame Basics¶
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, avg
spark = SparkSession.builder.appName("DataFrame Example").getOrCreate()
# Create DataFrame
data = [
("Alice", "Engineering", 50000),
("Bob", "Engineering", 60000),
("Charlie", "Marketing", 45000),
("Diana", "Marketing", 55000),
]
df = spark.createDataFrame(data, ["name", "department", "salary"])
# Basic operations
df.show()
df.printSchema()
# Filtering
df.filter(col("salary") > 50000).show()
# Aggregation
df.groupBy("department") \
.agg(
_sum("salary").alias("total_salary"),
avg("salary").alias("avg_salary")
) \
.show()
# Using SQL
df.createOrReplaceTempView("employees")
spark.sql("""
SELECT department, AVG(salary) as avg_salary
FROM employees
GROUP BY department
""").show()
spark.stop()
Practice Problems¶
Problem 1: Basic RDD Operations¶
Find the sum of squares of even numbers from 1 to 100.
# Solution
sc = spark.sparkContext
result = sc.parallelize(range(1, 101)) \
.filter(lambda x: x % 2 == 0) \
.map(lambda x: x ** 2) \
.reduce(lambda a, b: a + b)
print(result) # 171700
Problem 2: Pair RDD¶
Aggregate log counts by error level from a log file.
# Input: "2024-01-01 ERROR: Connection failed"
logs = sc.textFile("logs.txt")
error_counts = logs \
.map(lambda line: line.split()[1].replace(":", "")) \
.map(lambda level: (level, 1)) \
.reduceByKey(lambda a, b: a + b) \
.collect()
Summary¶
| Concept | Description |
|---|---|
| Spark | Unified engine for large-scale data processing |
| RDD | Basic distributed data structure |
| Transformation | Creates new RDD (Lazy) |
| Action | Returns result (Eager) |
| Driver | Main program execution node |
| Executor | Task execution worker |