PySpark DataFrame
PySpark DataFrame¶
๊ฐ์¶
Spark DataFrame์ ๋ถ์ฐ๋ ๋ฐ์ดํฐ๋ฅผ ํ ์ด๋ธ ํํ๋ก ํํํ๋ ๊ณ ์์ค API์ ๋๋ค. SQL๊ณผ ์ ์ฌํ ์ฐ์ฐ์ ์ ๊ณตํ๋ฉฐ, Catalyst ์ตํฐ๋ง์ด์ ๋ฅผ ํตํด ์๋์ผ๋ก ์ต์ ํ๋ฉ๋๋ค.
1. SparkSession๊ณผ DataFrame ์์ฑ¶
1.1 SparkSession ์ด๊ธฐํ¶
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType
# SparkSession ์์ฑ
spark = SparkSession.builder \
.appName("PySpark DataFrame Tutorial") \
.config("spark.sql.shuffle.partitions", 100) \
.config("spark.sql.adaptive.enabled", True) \
.getOrCreate()
# Spark ๋ฒ์ ํ์ธ
print(f"Spark Version: {spark.version}")
1.2 DataFrame ์์ฑ ๋ฐฉ๋ฒ¶
# ๋ฐฉ๋ฒ 1: Python ๋ฆฌ์คํธ์์ ์์ฑ
data = [
("Alice", 30, "Engineering"),
("Bob", 25, "Marketing"),
("Charlie", 35, "Engineering"),
]
df1 = spark.createDataFrame(data, ["name", "age", "department"])
# ๋ฐฉ๋ฒ 2: ์คํค๋ง ๋ช
์
schema = StructType([
StructField("name", StringType(), nullable=False),
StructField("age", IntegerType(), nullable=True),
StructField("department", StringType(), nullable=True),
])
df2 = spark.createDataFrame(data, schema)
# ๋ฐฉ๋ฒ 3: ๋์
๋๋ฆฌ ๋ฆฌ์คํธ์์ ์์ฑ
dict_data = [
{"name": "Alice", "age": 30, "department": "Engineering"},
{"name": "Bob", "age": 25, "department": "Marketing"},
]
df3 = spark.createDataFrame(dict_data)
# ๋ฐฉ๋ฒ 4: Pandas DataFrame์์ ์์ฑ
import pandas as pd
pdf = pd.DataFrame(data, columns=["name", "age", "department"])
df4 = spark.createDataFrame(pdf)
# ๋ฐฉ๋ฒ 5: RDD์์ ์์ฑ
rdd = spark.sparkContext.parallelize(data)
df5 = rdd.toDF(["name", "age", "department"])
1.3 ํ์ผ์์ DataFrame ์ฝ๊ธฐ¶
# CSV ํ์ผ
df_csv = spark.read.csv(
"data.csv",
header=True, # ์ฒซ ํ์ ํค๋๋ก
inferSchema=True, # ์คํค๋ง ์๋ ์ถ๋ก
sep=",", # ๊ตฌ๋ถ์
nullValue="NA", # NULL ํํ
dateFormat="yyyy-MM-dd"
)
# ์คํค๋ง ๋ช
์ (๊ถ์ฅ - ์ฑ๋ฅ ํฅ์)
schema = StructType([
StructField("id", IntegerType()),
StructField("name", StringType()),
StructField("amount", DoubleType()),
StructField("date", DateType()),
])
df_csv = spark.read.csv("data.csv", header=True, schema=schema)
# Parquet ํ์ผ (๊ถ์ฅ - ์ปฌ๋ผ ํ์)
df_parquet = spark.read.parquet("data.parquet")
# JSON ํ์ผ
df_json = spark.read.json("data.json")
# ORC ํ์ผ
df_orc = spark.read.orc("data.orc")
# JDBC (๋ฐ์ดํฐ๋ฒ ์ด์ค)
df_jdbc = spark.read.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/mydb") \
.option("dbtable", "public.users") \
.option("user", "user") \
.option("password", "password") \
.option("driver", "org.postgresql.Driver") \
.load()
# Delta Lake
df_delta = spark.read.format("delta").load("path/to/delta")
2. DataFrame ๊ธฐ๋ณธ ์ฐ์ฐ¶
2.1 ๋ฐ์ดํฐ ํ์ธ¶
# ๋ฐ์ดํฐ ๋ฏธ๋ฆฌ๋ณด๊ธฐ
df.show() # ์์ 20ํ
df.show(5) # ์์ 5ํ
df.show(truncate=False) # ์ปฌ๋ผ ์๋ฆผ ์์ด
# ์คํค๋ง ํ์ธ
df.printSchema()
df.dtypes # [(์ปฌ๋ผ๋ช
, ํ์
), ...]
df.columns # ์ปฌ๋ผ ๋ชฉ๋ก
# ํต๊ณ ์ ๋ณด
df.describe().show() # ๊ธฐ์ ํต๊ณ
df.summary().show() # ํ์ฅ ํต๊ณ
# ๋ ์ฝ๋ ์
df.count()
# ์ ๋ํฌ ๊ฐ ์
df.select("department").distinct().count()
# ์ฒซ ๋ฒ์งธ ํ
df.first()
df.head(5)
# Pandas๋ก ๋ณํ (์์ ๋ฐ์ดํฐ์
๋ง)
pdf = df.toPandas()
2.2 ์ปฌ๋ผ ์ ํ¶
from pyspark.sql.functions import col, lit
# ๋จ์ผ ์ปฌ๋ผ
df.select("name")
df.select(col("name"))
df.select(df.name)
df.select(df["name"])
# ์ฌ๋ฌ ์ปฌ๋ผ
df.select("name", "age")
df.select(["name", "age"])
df.select(col("name"), col("age"))
# ๋ชจ๋ ์ปฌ๋ผ + ์ถ๊ฐ ์ปฌ๋ผ
df.select("*", lit(1).alias("constant"))
# ์ปฌ๋ผ ์ ์ธ
df.drop("department")
# ์ปฌ๋ผ ์ด๋ฆ ๋ณ๊ฒฝ
df.withColumnRenamed("name", "full_name")
# ์ฌ๋ฌ ์ปฌ๋ผ ์ด๋ฆ ๋ณ๊ฒฝ
df.toDF("name_new", "age_new", "dept_new")
# alias ์ฌ์ฉ
df.select(col("name").alias("employee_name"))
2.3 ํํฐ๋ง¶
from pyspark.sql.functions import col
# ๊ธฐ๋ณธ ํํฐ
df.filter(col("age") > 30)
df.filter(df.age > 30)
df.filter("age > 30") # SQL ํํ์
df.where(col("age") > 30) # filter์ ๋์ผ
# ๋ณตํฉ ์กฐ๊ฑด
df.filter((col("age") > 25) & (col("department") == "Engineering"))
df.filter((col("age") < 25) | (col("department") == "Marketing"))
df.filter(~(col("age") > 30)) # NOT
# ๋ฌธ์์ด ํํฐ
df.filter(col("name").startswith("A"))
df.filter(col("name").endswith("e"))
df.filter(col("name").contains("li"))
df.filter(col("name").like("%li%"))
df.filter(col("name").rlike("^[A-C].*")) # ์ ๊ท์
# IN ์กฐ๊ฑด
df.filter(col("department").isin(["Engineering", "Marketing"]))
# NULL ์ฒ๋ฆฌ
df.filter(col("age").isNull())
df.filter(col("age").isNotNull())
# BETWEEN
df.filter(col("age").between(25, 35))
3. ๋ณํ (Transformations)¶
3.1 ์ปฌ๋ผ ์ถ๊ฐ/์์ ¶
from pyspark.sql.functions import col, lit, when, concat, upper, lower, length
# ์ ์ปฌ๋ผ ์ถ๊ฐ
df.withColumn("bonus", col("salary") * 0.1)
# ์์ ์ปฌ๋ผ
df.withColumn("country", lit("USA"))
# ๊ธฐ์กด ์ปฌ๋ผ ์์
df.withColumn("name", upper(col("name")))
# ์กฐ๊ฑด๋ถ ์ปฌ๋ผ (CASE WHEN)
df.withColumn("age_group",
when(col("age") < 30, "Young")
.when(col("age") < 50, "Middle")
.otherwise("Senior")
)
# ์ฌ๋ฌ ์ปฌ๋ผ ๋์์
df.withColumns({
"name_upper": upper(col("name")),
"age_plus_10": col("age") + 10,
})
# ๋ฌธ์์ด ๊ฒฐํฉ
df.withColumn("full_info", concat(col("name"), lit(" - "), col("department")))
# ํ์
์บ์คํ
df.withColumn("age_double", col("age").cast("double"))
df.withColumn("age_string", col("age").cast(StringType()))
3.2 ์ง๊ณ ์ฐ์ฐ¶
from pyspark.sql.functions import (
count, sum as _sum, avg, min as _min, max as _max,
countDistinct, collect_list, collect_set,
first, last, stddev, variance
)
# ์ ์ฒด ์ง๊ณ
df.agg(
count("*").alias("total_count"),
_sum("salary").alias("total_salary"),
avg("salary").alias("avg_salary"),
_min("salary").alias("min_salary"),
_max("salary").alias("max_salary"),
).show()
# ๊ทธ๋ฃน๋ณ ์ง๊ณ
df.groupBy("department").agg(
count("*").alias("employee_count"),
avg("salary").alias("avg_salary"),
_sum("salary").alias("total_salary"),
countDistinct("name").alias("unique_names"),
)
# ์ฌ๋ฌ ์ปฌ๋ผ ๊ทธ๋ฃนํ
df.groupBy("department", "age_group").count()
# ๋ฆฌ์คํธ/์งํฉ ์ง๊ณ
df.groupBy("department").agg(
collect_list("name").alias("employee_names"),
collect_set("age").alias("unique_ages"),
)
# ํผ๋ฒ ํ
์ด๋ธ
df.groupBy("department") \
.pivot("age_group", ["Young", "Middle", "Senior"]) \
.agg(count("*"))
3.3 ์ ๋ ฌ¶
from pyspark.sql.functions import col, asc, desc
# ๋จ์ผ ์ปฌ๋ผ ์ ๋ ฌ
df.orderBy("age") # ์ค๋ฆ์ฐจ์ (๊ธฐ๋ณธ)
df.orderBy(col("age").desc()) # ๋ด๋ฆผ์ฐจ์
df.orderBy(desc("age"))
# ์ฌ๋ฌ ์ปฌ๋ผ ์ ๋ ฌ
df.orderBy(["department", "age"])
df.orderBy(col("department").asc(), col("age").desc())
# NULL ์ฒ๋ฆฌ
df.orderBy(col("age").asc_nulls_first())
df.orderBy(col("age").desc_nulls_last())
# sort๋ orderBy์ ๋์ผ
df.sort("age")
3.4 ์กฐ์ธ¶
# ํ
์คํธ ๋ฐ์ดํฐ
employees = spark.createDataFrame([
(1, "Alice", 101),
(2, "Bob", 102),
(3, "Charlie", 101),
], ["id", "name", "dept_id"])
departments = spark.createDataFrame([
(101, "Engineering"),
(102, "Marketing"),
(103, "Finance"),
], ["dept_id", "dept_name"])
# Inner Join (๊ธฐ๋ณธ)
employees.join(departments, employees.dept_id == departments.dept_id)
employees.join(departments, "dept_id") # ๋์ผ ์ปฌ๋ผ๋ช
# Left Join
employees.join(departments, "dept_id", "left")
# Right Join
employees.join(departments, "dept_id", "right")
# Full Outer Join
employees.join(departments, "dept_id", "full")
# Cross Join (Cartesian)
employees.crossJoin(departments)
# Semi Join (์ผ์ชฝ ํ
์ด๋ธ๋ง, ์กฐ๊ฑด ์ถฉ์กฑ)
employees.join(departments, "dept_id", "left_semi")
# Anti Join (์ผ์ชฝ ํ
์ด๋ธ๋ง, ์กฐ๊ฑด ๋ฏธ์ถฉ์กฑ)
employees.join(departments, "dept_id", "left_anti")
# ๋ณตํฉ ์กฐ๊ฑด ์กฐ์ธ
employees.join(
departments,
(employees.dept_id == departments.dept_id) & (employees.id > 1),
"inner"
)
4. ์ก์ (Actions)¶
4.1 ๋ฐ์ดํฐ ์์ง¶
# Driver๋ก ๋ฐ์ดํฐ ์์ง
result = df.collect() # ์ ์ฒด ๋ฐ์ดํฐ (์ฃผ์: ๋ฉ๋ชจ๋ฆฌ)
result = df.take(10) # ์์ 10๊ฐ
result = df.first() # ์ฒซ ๋ฒ์งธ ํ
result = df.head(5) # ์์ 5๊ฐ
# ๋ฆฌ์คํธ๋ก ๋ณํ
ages = df.select("age").rdd.flatMap(lambda x: x).collect()
# Pandas DataFrame์ผ๋ก
pdf = df.toPandas() # ์์ ๋ฐ์ดํฐ๋ง
# Iterator๋ก (๋์ฉ๋)
for row in df.toLocalIterator():
print(row)
4.2 ํ์ผ ์ ์ฅ¶
# Parquet (๊ถ์ฅ)
df.write.parquet("output/data.parquet")
# ๋ชจ๋ ์ง์
df.write.mode("overwrite").parquet("output/data.parquet")
# overwrite: ๋ฎ์ด์ฐ๊ธฐ
# append: ์ถ๊ฐ
# ignore: ์กด์ฌํ๋ฉด ๋ฌด์
# error: ์กด์ฌํ๋ฉด ์๋ฌ (๊ธฐ๋ณธ)
# ํํฐ์
์ ์ฅ
df.write.partitionBy("date", "department").parquet("output/partitioned")
# CSV
df.write.csv("output/data.csv", header=True)
# JSON
df.write.json("output/data.json")
# ๋จ์ผ ํ์ผ๋ก ์ ์ฅ
df.coalesce(1).write.csv("output/single_file.csv", header=True)
# JDBC (๋ฐ์ดํฐ๋ฒ ์ด์ค)
df.write.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/mydb") \
.option("dbtable", "public.output_table") \
.option("user", "user") \
.option("password", "password") \
.mode("overwrite") \
.save()
5. UDF (User Defined Functions)¶
5.1 ๊ธฐ๋ณธ UDF¶
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType, IntegerType
# Python ํจ์ ์ ์
def categorize_age(age):
if age is None:
return "Unknown"
elif age < 30:
return "Young"
elif age < 50:
return "Middle"
else:
return "Senior"
# UDF ๋ฑ๋ก (๋ฐ์ฝ๋ ์ดํฐ ๋ฐฉ์)
@udf(returnType=StringType())
def categorize_age_udf(age):
if age is None:
return "Unknown"
elif age < 30:
return "Young"
elif age < 50:
return "Middle"
else:
return "Senior"
# UDF ๋ฑ๋ก (ํจ์ ๋ฐฉ์)
categorize_udf = udf(categorize_age, StringType())
# ์ฌ์ฉ
df.withColumn("age_category", categorize_udf(col("age")))
df.withColumn("age_category", categorize_age_udf(col("age")))
5.2 Pandas UDF (์ฑ๋ฅ ํฅ์)¶
from pyspark.sql.functions import pandas_udf
import pandas as pd
# Scalar Pandas UDF (1:1 ๋งคํ)
@pandas_udf(StringType())
def categorize_pandas_udf(age_series: pd.Series) -> pd.Series:
return age_series.apply(
lambda x: "Unknown" if x is None
else "Young" if x < 30
else "Middle" if x < 50
else "Senior"
)
# ์ฌ์ฉ
df.withColumn("age_category", categorize_pandas_udf(col("age")))
# Grouped Pandas UDF (๊ทธ๋ฃน๋ณ ์ฒ๋ฆฌ)
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
result_schema = StructType([
StructField("department", StringType()),
StructField("avg_salary", DoubleType()),
StructField("employee_count", IntegerType()),
])
@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
def analyze_department(pdf: pd.DataFrame) -> pd.DataFrame:
return pd.DataFrame({
"department": [pdf["department"].iloc[0]],
"avg_salary": [pdf["salary"].mean()],
"employee_count": [len(pdf)],
})
# ์ฌ์ฉ
df.groupby("department").apply(analyze_department)
5.3 SQL์์ UDF ์ฌ์ฉ¶
# SQL์ฉ UDF ๋ฑ๋ก
spark.udf.register("categorize_age", categorize_age, StringType())
# SQL์์ ์ฌ์ฉ
df.createOrReplaceTempView("employees")
spark.sql("""
SELECT name, age, categorize_age(age) as age_category
FROM employees
""").show()
6. ์๋์ฐ ํจ์¶
from pyspark.sql.window import Window
from pyspark.sql.functions import (
row_number, rank, dense_rank,
lead, lag, sum as _sum, avg,
first, last, ntile
)
# ์๋์ฐ ์ ์
window_dept = Window.partitionBy("department").orderBy("salary")
window_all = Window.orderBy("salary")
# ์์ ํจ์
df.withColumn("row_num", row_number().over(window_dept))
df.withColumn("rank", rank().over(window_dept))
df.withColumn("dense_rank", dense_rank().over(window_dept))
df.withColumn("ntile_4", ntile(4).over(window_dept))
# ์ด์ /๋ค์ ๊ฐ
df.withColumn("prev_salary", lag("salary", 1).over(window_dept))
df.withColumn("next_salary", lead("salary", 1).over(window_dept))
# ๋์ ํฉ๊ณ
window_cumsum = Window.partitionBy("department") \
.orderBy("date") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn("cumsum_salary", _sum("salary").over(window_cumsum))
# ์ด๋ ํ๊ท
window_moving = Window.partitionBy("department") \
.orderBy("date") \
.rowsBetween(-2, 0) # ํ์ฌ + ์ด์ 2๊ฐ
df.withColumn("moving_avg", avg("salary").over(window_moving))
# ๊ทธ๋ฃน ๋ด ์ฒซ ๋ฒ์งธ/๋ง์ง๋ง ๊ฐ
df.withColumn("first_name", first("name").over(window_dept))
df.withColumn("last_name", last("name").over(window_dept))
์ฐ์ต ๋ฌธ์ ¶
๋ฌธ์ 1: ๋ฐ์ดํฐ ๋ณํ¶
ํ๋งค ๋ฐ์ดํฐ์์ ์๋ณ, ์นดํ ๊ณ ๋ฆฌ๋ณ ์ด ๋งค์ถ๊ณผ ํ๊ท ๋งค์ถ์ ๊ณ์ฐํ์ธ์.
๋ฌธ์ 2: ์๋์ฐ ํจ์¶
๊ฐ ๋ถ์๋ณ๋ก ๊ธ์ฌ ์์๋ฅผ ๋งค๊ธฐ๊ณ , ๋ถ์ ๋ด ๊ธ์ฌ ์์ 3๋ช ์ ์ถ์ถํ์ธ์.
๋ฌธ์ 3: UDF ์์ฑ¶
์ด๋ฉ์ผ ์ฃผ์์์ ๋๋ฉ์ธ์ ์ถ์ถํ๋ UDF๋ฅผ ์์ฑํ๊ณ ์ ์ฉํ์ธ์.
์์ฝ¶
| ์ฐ์ฐ | ์ค๋ช | ์์ |
|---|---|---|
| select | ์ปฌ๋ผ ์ ํ | df.select("name", "age") |
| filter | ํ ํํฐ๋ง | df.filter(col("age") > 30) |
| groupBy | ๊ทธ๋ฃนํ | df.groupBy("dept").agg(...) |
| join | ํ ์ด๋ธ ์กฐ์ธ | df1.join(df2, "key") |
| orderBy | ์ ๋ ฌ | df.orderBy(desc("salary")) |
| withColumn | ์ปฌ๋ผ ์ถ๊ฐ/์์ | df.withColumn("new", ...) |