PySpark DataFrame

PySpark DataFrame

๊ฐœ์š”

Spark DataFrame์€ ๋ถ„์‚ฐ๋œ ๋ฐ์ดํ„ฐ๋ฅผ ํ…Œ์ด๋ธ” ํ˜•ํƒœ๋กœ ํ‘œํ˜„ํ•˜๋Š” ๊ณ ์ˆ˜์ค€ API์ž…๋‹ˆ๋‹ค. SQL๊ณผ ์œ ์‚ฌํ•œ ์—ฐ์‚ฐ์„ ์ œ๊ณตํ•˜๋ฉฐ, Catalyst ์˜ตํ‹ฐ๋งˆ์ด์ €๋ฅผ ํ†ตํ•ด ์ž๋™์œผ๋กœ ์ตœ์ ํ™”๋ฉ๋‹ˆ๋‹ค.


1. SparkSession๊ณผ DataFrame ์ƒ์„ฑ

1.1 SparkSession ์ดˆ๊ธฐํ™”

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType

# SparkSession ์ƒ์„ฑ
spark = SparkSession.builder \
    .appName("PySpark DataFrame Tutorial") \
    .config("spark.sql.shuffle.partitions", 100) \
    .config("spark.sql.adaptive.enabled", True) \
    .getOrCreate()

# Spark ๋ฒ„์ „ ํ™•์ธ
print(f"Spark Version: {spark.version}")

1.2 DataFrame ์ƒ์„ฑ ๋ฐฉ๋ฒ•

# ๋ฐฉ๋ฒ• 1: Python ๋ฆฌ์ŠคํŠธ์—์„œ ์ƒ์„ฑ
data = [
    ("Alice", 30, "Engineering"),
    ("Bob", 25, "Marketing"),
    ("Charlie", 35, "Engineering"),
]
df1 = spark.createDataFrame(data, ["name", "age", "department"])

# ๋ฐฉ๋ฒ• 2: ์Šคํ‚ค๋งˆ ๋ช…์‹œ
schema = StructType([
    StructField("name", StringType(), nullable=False),
    StructField("age", IntegerType(), nullable=True),
    StructField("department", StringType(), nullable=True),
])
df2 = spark.createDataFrame(data, schema)

# ๋ฐฉ๋ฒ• 3: ๋”•์…”๋„ˆ๋ฆฌ ๋ฆฌ์ŠคํŠธ์—์„œ ์ƒ์„ฑ
dict_data = [
    {"name": "Alice", "age": 30, "department": "Engineering"},
    {"name": "Bob", "age": 25, "department": "Marketing"},
]
df3 = spark.createDataFrame(dict_data)

# ๋ฐฉ๋ฒ• 4: Pandas DataFrame์—์„œ ์ƒ์„ฑ
import pandas as pd
pdf = pd.DataFrame(data, columns=["name", "age", "department"])
df4 = spark.createDataFrame(pdf)

# ๋ฐฉ๋ฒ• 5: RDD์—์„œ ์ƒ์„ฑ
rdd = spark.sparkContext.parallelize(data)
df5 = rdd.toDF(["name", "age", "department"])

1.3 ํŒŒ์ผ์—์„œ DataFrame ์ฝ๊ธฐ

# CSV ํŒŒ์ผ
df_csv = spark.read.csv(
    "data.csv",
    header=True,           # ์ฒซ ํ–‰์„ ํ—ค๋”๋กœ
    inferSchema=True,      # ์Šคํ‚ค๋งˆ ์ž๋™ ์ถ”๋ก 
    sep=",",               # ๊ตฌ๋ถ„์ž
    nullValue="NA",        # NULL ํ‘œํ˜„
    dateFormat="yyyy-MM-dd"
)

# ์Šคํ‚ค๋งˆ ๋ช…์‹œ (๊ถŒ์žฅ - ์„ฑ๋Šฅ ํ–ฅ์ƒ)
schema = StructType([
    StructField("id", IntegerType()),
    StructField("name", StringType()),
    StructField("amount", DoubleType()),
    StructField("date", DateType()),
])
df_csv = spark.read.csv("data.csv", header=True, schema=schema)

# Parquet ํŒŒ์ผ (๊ถŒ์žฅ - ์ปฌ๋Ÿผ ํ˜•์‹)
df_parquet = spark.read.parquet("data.parquet")

# JSON ํŒŒ์ผ
df_json = spark.read.json("data.json")

# ORC ํŒŒ์ผ
df_orc = spark.read.orc("data.orc")

# JDBC (๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค)
df_jdbc = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/mydb") \
    .option("dbtable", "public.users") \
    .option("user", "user") \
    .option("password", "password") \
    .option("driver", "org.postgresql.Driver") \
    .load()

# Delta Lake
df_delta = spark.read.format("delta").load("path/to/delta")

2. DataFrame ๊ธฐ๋ณธ ์—ฐ์‚ฐ

2.1 ๋ฐ์ดํ„ฐ ํ™•์ธ

# ๋ฐ์ดํ„ฐ ๋ฏธ๋ฆฌ๋ณด๊ธฐ
df.show()           # ์ƒ์œ„ 20ํ–‰
df.show(5)          # ์ƒ์œ„ 5ํ–‰
df.show(truncate=False)  # ์ปฌ๋Ÿผ ์ž˜๋ฆผ ์—†์ด

# ์Šคํ‚ค๋งˆ ํ™•์ธ
df.printSchema()
df.dtypes           # [(์ปฌ๋Ÿผ๋ช…, ํƒ€์ž…), ...]
df.columns          # ์ปฌ๋Ÿผ ๋ชฉ๋ก

# ํ†ต๊ณ„ ์ •๋ณด
df.describe().show()        # ๊ธฐ์ˆ  ํ†ต๊ณ„
df.summary().show()         # ํ™•์žฅ ํ†ต๊ณ„

# ๋ ˆ์ฝ”๋“œ ์ˆ˜
df.count()

# ์œ ๋‹ˆํฌ ๊ฐ’ ์ˆ˜
df.select("department").distinct().count()

# ์ฒซ ๋ฒˆ์งธ ํ–‰
df.first()
df.head(5)

# Pandas๋กœ ๋ณ€ํ™˜ (์ž‘์€ ๋ฐ์ดํ„ฐ์…‹๋งŒ)
pdf = df.toPandas()

2.2 ์ปฌ๋Ÿผ ์„ ํƒ

from pyspark.sql.functions import col, lit

# ๋‹จ์ผ ์ปฌ๋Ÿผ
df.select("name")
df.select(col("name"))
df.select(df.name)
df.select(df["name"])

# ์—ฌ๋Ÿฌ ์ปฌ๋Ÿผ
df.select("name", "age")
df.select(["name", "age"])
df.select(col("name"), col("age"))

# ๋ชจ๋“  ์ปฌ๋Ÿผ + ์ถ”๊ฐ€ ์ปฌ๋Ÿผ
df.select("*", lit(1).alias("constant"))

# ์ปฌ๋Ÿผ ์ œ์™ธ
df.drop("department")

# ์ปฌ๋Ÿผ ์ด๋ฆ„ ๋ณ€๊ฒฝ
df.withColumnRenamed("name", "full_name")

# ์—ฌ๋Ÿฌ ์ปฌ๋Ÿผ ์ด๋ฆ„ ๋ณ€๊ฒฝ
df.toDF("name_new", "age_new", "dept_new")

# alias ์‚ฌ์šฉ
df.select(col("name").alias("employee_name"))

2.3 ํ•„ํ„ฐ๋ง

from pyspark.sql.functions import col

# ๊ธฐ๋ณธ ํ•„ํ„ฐ
df.filter(col("age") > 30)
df.filter(df.age > 30)
df.filter("age > 30")           # SQL ํ‘œํ˜„์‹
df.where(col("age") > 30)       # filter์™€ ๋™์ผ

# ๋ณตํ•ฉ ์กฐ๊ฑด
df.filter((col("age") > 25) & (col("department") == "Engineering"))
df.filter((col("age") < 25) | (col("department") == "Marketing"))
df.filter(~(col("age") > 30))   # NOT

# ๋ฌธ์ž์—ด ํ•„ํ„ฐ
df.filter(col("name").startswith("A"))
df.filter(col("name").endswith("e"))
df.filter(col("name").contains("li"))
df.filter(col("name").like("%li%"))
df.filter(col("name").rlike("^[A-C].*"))  # ์ •๊ทœ์‹

# IN ์กฐ๊ฑด
df.filter(col("department").isin(["Engineering", "Marketing"]))

# NULL ์ฒ˜๋ฆฌ
df.filter(col("age").isNull())
df.filter(col("age").isNotNull())

# BETWEEN
df.filter(col("age").between(25, 35))

3. ๋ณ€ํ™˜ (Transformations)

3.1 ์ปฌ๋Ÿผ ์ถ”๊ฐ€/์ˆ˜์ •

from pyspark.sql.functions import col, lit, when, concat, upper, lower, length

# ์ƒˆ ์ปฌ๋Ÿผ ์ถ”๊ฐ€
df.withColumn("bonus", col("salary") * 0.1)

# ์ƒ์ˆ˜ ์ปฌ๋Ÿผ
df.withColumn("country", lit("USA"))

# ๊ธฐ์กด ์ปฌ๋Ÿผ ์ˆ˜์ •
df.withColumn("name", upper(col("name")))

# ์กฐ๊ฑด๋ถ€ ์ปฌ๋Ÿผ (CASE WHEN)
df.withColumn("age_group",
    when(col("age") < 30, "Young")
    .when(col("age") < 50, "Middle")
    .otherwise("Senior")
)

# ์—ฌ๋Ÿฌ ์ปฌ๋Ÿผ ๋™์‹œ์—
df.withColumns({
    "name_upper": upper(col("name")),
    "age_plus_10": col("age") + 10,
})

# ๋ฌธ์ž์—ด ๊ฒฐํ•ฉ
df.withColumn("full_info", concat(col("name"), lit(" - "), col("department")))

# ํƒ€์ž… ์บ์ŠคํŒ…
df.withColumn("age_double", col("age").cast("double"))
df.withColumn("age_string", col("age").cast(StringType()))

3.2 ์ง‘๊ณ„ ์—ฐ์‚ฐ

from pyspark.sql.functions import (
    count, sum as _sum, avg, min as _min, max as _max,
    countDistinct, collect_list, collect_set,
    first, last, stddev, variance
)

# ์ „์ฒด ์ง‘๊ณ„
df.agg(
    count("*").alias("total_count"),
    _sum("salary").alias("total_salary"),
    avg("salary").alias("avg_salary"),
    _min("salary").alias("min_salary"),
    _max("salary").alias("max_salary"),
).show()

# ๊ทธ๋ฃน๋ณ„ ์ง‘๊ณ„
df.groupBy("department").agg(
    count("*").alias("employee_count"),
    avg("salary").alias("avg_salary"),
    _sum("salary").alias("total_salary"),
    countDistinct("name").alias("unique_names"),
)

# ์—ฌ๋Ÿฌ ์ปฌ๋Ÿผ ๊ทธ๋ฃนํ™”
df.groupBy("department", "age_group").count()

# ๋ฆฌ์ŠคํŠธ/์ง‘ํ•ฉ ์ง‘๊ณ„
df.groupBy("department").agg(
    collect_list("name").alias("employee_names"),
    collect_set("age").alias("unique_ages"),
)

# ํ”ผ๋ฒ— ํ…Œ์ด๋ธ”
df.groupBy("department") \
    .pivot("age_group", ["Young", "Middle", "Senior"]) \
    .agg(count("*"))

3.3 ์ •๋ ฌ

from pyspark.sql.functions import col, asc, desc

# ๋‹จ์ผ ์ปฌ๋Ÿผ ์ •๋ ฌ
df.orderBy("age")                    # ์˜ค๋ฆ„์ฐจ์ˆœ (๊ธฐ๋ณธ)
df.orderBy(col("age").desc())        # ๋‚ด๋ฆผ์ฐจ์ˆœ
df.orderBy(desc("age"))

# ์—ฌ๋Ÿฌ ์ปฌ๋Ÿผ ์ •๋ ฌ
df.orderBy(["department", "age"])
df.orderBy(col("department").asc(), col("age").desc())

# NULL ์ฒ˜๋ฆฌ
df.orderBy(col("age").asc_nulls_first())
df.orderBy(col("age").desc_nulls_last())

# sort๋Š” orderBy์™€ ๋™์ผ
df.sort("age")

3.4 ์กฐ์ธ

# ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ
employees = spark.createDataFrame([
    (1, "Alice", 101),
    (2, "Bob", 102),
    (3, "Charlie", 101),
], ["id", "name", "dept_id"])

departments = spark.createDataFrame([
    (101, "Engineering"),
    (102, "Marketing"),
    (103, "Finance"),
], ["dept_id", "dept_name"])

# Inner Join (๊ธฐ๋ณธ)
employees.join(departments, employees.dept_id == departments.dept_id)
employees.join(departments, "dept_id")  # ๋™์ผ ์ปฌ๋Ÿผ๋ช…

# Left Join
employees.join(departments, "dept_id", "left")

# Right Join
employees.join(departments, "dept_id", "right")

# Full Outer Join
employees.join(departments, "dept_id", "full")

# Cross Join (Cartesian)
employees.crossJoin(departments)

# Semi Join (์™ผ์ชฝ ํ…Œ์ด๋ธ”๋งŒ, ์กฐ๊ฑด ์ถฉ์กฑ)
employees.join(departments, "dept_id", "left_semi")

# Anti Join (์™ผ์ชฝ ํ…Œ์ด๋ธ”๋งŒ, ์กฐ๊ฑด ๋ฏธ์ถฉ์กฑ)
employees.join(departments, "dept_id", "left_anti")

# ๋ณตํ•ฉ ์กฐ๊ฑด ์กฐ์ธ
employees.join(
    departments,
    (employees.dept_id == departments.dept_id) & (employees.id > 1),
    "inner"
)

4. ์•ก์…˜ (Actions)

4.1 ๋ฐ์ดํ„ฐ ์ˆ˜์ง‘

# Driver๋กœ ๋ฐ์ดํ„ฐ ์ˆ˜์ง‘
result = df.collect()           # ์ „์ฒด ๋ฐ์ดํ„ฐ (์ฃผ์˜: ๋ฉ”๋ชจ๋ฆฌ)
result = df.take(10)            # ์ƒ์œ„ 10๊ฐœ
result = df.first()             # ์ฒซ ๋ฒˆ์งธ ํ–‰
result = df.head(5)             # ์ƒ์œ„ 5๊ฐœ

# ๋ฆฌ์ŠคํŠธ๋กœ ๋ณ€ํ™˜
ages = df.select("age").rdd.flatMap(lambda x: x).collect()

# Pandas DataFrame์œผ๋กœ
pdf = df.toPandas()             # ์ž‘์€ ๋ฐ์ดํ„ฐ๋งŒ

# Iterator๋กœ (๋Œ€์šฉ๋Ÿ‰)
for row in df.toLocalIterator():
    print(row)

4.2 ํŒŒ์ผ ์ €์žฅ

# Parquet (๊ถŒ์žฅ)
df.write.parquet("output/data.parquet")

# ๋ชจ๋“œ ์ง€์ •
df.write.mode("overwrite").parquet("output/data.parquet")
# overwrite: ๋ฎ์–ด์“ฐ๊ธฐ
# append: ์ถ”๊ฐ€
# ignore: ์กด์žฌํ•˜๋ฉด ๋ฌด์‹œ
# error: ์กด์žฌํ•˜๋ฉด ์—๋Ÿฌ (๊ธฐ๋ณธ)

# ํŒŒํ‹ฐ์…˜ ์ €์žฅ
df.write.partitionBy("date", "department").parquet("output/partitioned")

# CSV
df.write.csv("output/data.csv", header=True)

# JSON
df.write.json("output/data.json")

# ๋‹จ์ผ ํŒŒ์ผ๋กœ ์ €์žฅ
df.coalesce(1).write.csv("output/single_file.csv", header=True)

# JDBC (๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค)
df.write.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/mydb") \
    .option("dbtable", "public.output_table") \
    .option("user", "user") \
    .option("password", "password") \
    .mode("overwrite") \
    .save()

5. UDF (User Defined Functions)

5.1 ๊ธฐ๋ณธ UDF

from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType, IntegerType

# Python ํ•จ์ˆ˜ ์ •์˜
def categorize_age(age):
    if age is None:
        return "Unknown"
    elif age < 30:
        return "Young"
    elif age < 50:
        return "Middle"
    else:
        return "Senior"

# UDF ๋“ฑ๋ก (๋ฐ์ฝ”๋ ˆ์ดํ„ฐ ๋ฐฉ์‹)
@udf(returnType=StringType())
def categorize_age_udf(age):
    if age is None:
        return "Unknown"
    elif age < 30:
        return "Young"
    elif age < 50:
        return "Middle"
    else:
        return "Senior"

# UDF ๋“ฑ๋ก (ํ•จ์ˆ˜ ๋ฐฉ์‹)
categorize_udf = udf(categorize_age, StringType())

# ์‚ฌ์šฉ
df.withColumn("age_category", categorize_udf(col("age")))
df.withColumn("age_category", categorize_age_udf(col("age")))

5.2 Pandas UDF (์„ฑ๋Šฅ ํ–ฅ์ƒ)

from pyspark.sql.functions import pandas_udf
import pandas as pd

# Scalar Pandas UDF (1:1 ๋งคํ•‘)
@pandas_udf(StringType())
def categorize_pandas_udf(age_series: pd.Series) -> pd.Series:
    return age_series.apply(
        lambda x: "Unknown" if x is None
        else "Young" if x < 30
        else "Middle" if x < 50
        else "Senior"
    )

# ์‚ฌ์šฉ
df.withColumn("age_category", categorize_pandas_udf(col("age")))

# Grouped Pandas UDF (๊ทธ๋ฃน๋ณ„ ์ฒ˜๋ฆฌ)
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

result_schema = StructType([
    StructField("department", StringType()),
    StructField("avg_salary", DoubleType()),
    StructField("employee_count", IntegerType()),
])

@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
def analyze_department(pdf: pd.DataFrame) -> pd.DataFrame:
    return pd.DataFrame({
        "department": [pdf["department"].iloc[0]],
        "avg_salary": [pdf["salary"].mean()],
        "employee_count": [len(pdf)],
    })

# ์‚ฌ์šฉ
df.groupby("department").apply(analyze_department)

5.3 SQL์—์„œ UDF ์‚ฌ์šฉ

# SQL์šฉ UDF ๋“ฑ๋ก
spark.udf.register("categorize_age", categorize_age, StringType())

# SQL์—์„œ ์‚ฌ์šฉ
df.createOrReplaceTempView("employees")
spark.sql("""
    SELECT name, age, categorize_age(age) as age_category
    FROM employees
""").show()

6. ์œˆ๋„์šฐ ํ•จ์ˆ˜

from pyspark.sql.window import Window
from pyspark.sql.functions import (
    row_number, rank, dense_rank,
    lead, lag, sum as _sum, avg,
    first, last, ntile
)

# ์œˆ๋„์šฐ ์ •์˜
window_dept = Window.partitionBy("department").orderBy("salary")
window_all = Window.orderBy("salary")

# ์ˆœ์œ„ ํ•จ์ˆ˜
df.withColumn("row_num", row_number().over(window_dept))
df.withColumn("rank", rank().over(window_dept))
df.withColumn("dense_rank", dense_rank().over(window_dept))
df.withColumn("ntile_4", ntile(4).over(window_dept))

# ์ด์ „/๋‹ค์Œ ๊ฐ’
df.withColumn("prev_salary", lag("salary", 1).over(window_dept))
df.withColumn("next_salary", lead("salary", 1).over(window_dept))

# ๋ˆ„์  ํ•ฉ๊ณ„
window_cumsum = Window.partitionBy("department") \
    .orderBy("date") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

df.withColumn("cumsum_salary", _sum("salary").over(window_cumsum))

# ์ด๋™ ํ‰๊ท 
window_moving = Window.partitionBy("department") \
    .orderBy("date") \
    .rowsBetween(-2, 0)  # ํ˜„์žฌ + ์ด์ „ 2๊ฐœ

df.withColumn("moving_avg", avg("salary").over(window_moving))

# ๊ทธ๋ฃน ๋‚ด ์ฒซ ๋ฒˆ์งธ/๋งˆ์ง€๋ง‰ ๊ฐ’
df.withColumn("first_name", first("name").over(window_dept))
df.withColumn("last_name", last("name").over(window_dept))

์—ฐ์Šต ๋ฌธ์ œ

๋ฌธ์ œ 1: ๋ฐ์ดํ„ฐ ๋ณ€ํ™˜

ํŒ๋งค ๋ฐ์ดํ„ฐ์—์„œ ์›”๋ณ„, ์นดํ…Œ๊ณ ๋ฆฌ๋ณ„ ์ด ๋งค์ถœ๊ณผ ํ‰๊ท  ๋งค์ถœ์„ ๊ณ„์‚ฐํ•˜์„ธ์š”.

๋ฌธ์ œ 2: ์œˆ๋„์šฐ ํ•จ์ˆ˜

๊ฐ ๋ถ€์„œ๋ณ„๋กœ ๊ธ‰์—ฌ ์ˆœ์œ„๋ฅผ ๋งค๊ธฐ๊ณ , ๋ถ€์„œ ๋‚ด ๊ธ‰์—ฌ ์ƒ์œ„ 3๋ช…์„ ์ถ”์ถœํ•˜์„ธ์š”.

๋ฌธ์ œ 3: UDF ์ž‘์„ฑ

์ด๋ฉ”์ผ ์ฃผ์†Œ์—์„œ ๋„๋ฉ”์ธ์„ ์ถ”์ถœํ•˜๋Š” UDF๋ฅผ ์ž‘์„ฑํ•˜๊ณ  ์ ์šฉํ•˜์„ธ์š”.


์š”์•ฝ

์—ฐ์‚ฐ ์„ค๋ช… ์˜ˆ์‹œ
select ์ปฌ๋Ÿผ ์„ ํƒ df.select("name", "age")
filter ํ–‰ ํ•„ํ„ฐ๋ง df.filter(col("age") > 30)
groupBy ๊ทธ๋ฃนํ™” df.groupBy("dept").agg(...)
join ํ…Œ์ด๋ธ” ์กฐ์ธ df1.join(df2, "key")
orderBy ์ •๋ ฌ df.orderBy(desc("salary"))
withColumn ์ปฌ๋Ÿผ ์ถ”๊ฐ€/์ˆ˜์ • df.withColumn("new", ...)

์ฐธ๊ณ  ์ž๋ฃŒ

to navigate between lessons