1"""
2PySpark DataFrame ์ฒ๋ฆฌ ์์
3
4์ค์ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ์๋๋ฆฌ์ค๋ฅผ ๋ณด์ฌ์ฃผ๋ ์์ ์
๋๋ค:
5- ๋ฐ์ดํฐ ์์ฑ ๋ฐ ๋ก๋ฉ
6- ๋ณํ (ํํฐ, ์ง๊ณ, ์กฐ์ธ)
7- ์๋์ฐ ํจ์
8- UDF (User Defined Functions)
9
10์คํ:
11 spark-submit data_processing.py
12"""
13
14from pyspark.sql import SparkSession
15from pyspark.sql.functions import (
16 col, lit, when, sum as _sum, avg, count, countDistinct,
17 to_date, year, month, dayofweek,
18 row_number, rank, lag, lead,
19 udf, pandas_udf
20)
21from pyspark.sql.window import Window
22from pyspark.sql.types import (
23 StructType, StructField, StringType, IntegerType,
24 DoubleType, DateType, TimestampType
25)
26from datetime import datetime, timedelta
27import random
28
29
30def create_sample_data(spark):
31 """์ํ ๋ฐ์ดํฐ ์์ฑ"""
32 print("Creating sample data...")
33
34 # ๊ณ ๊ฐ ๋ฐ์ดํฐ
35 customers = [
36 (1, "Alice", "Gold", "New York", "2020-01-15"),
37 (2, "Bob", "Silver", "Los Angeles", "2020-03-20"),
38 (3, "Charlie", "Bronze", "Chicago", "2021-06-10"),
39 (4, "Diana", "Gold", "Houston", "2019-11-05"),
40 (5, "Eve", "Silver", "Phoenix", "2022-02-28"),
41 ]
42
43 customers_schema = StructType([
44 StructField("customer_id", IntegerType(), False),
45 StructField("name", StringType(), False),
46 StructField("segment", StringType(), True),
47 StructField("city", StringType(), True),
48 StructField("join_date", StringType(), True),
49 ])
50
51 customers_df = spark.createDataFrame(customers, customers_schema)
52 customers_df = customers_df.withColumn("join_date", to_date(col("join_date")))
53
54 # ์ํ ๋ฐ์ดํฐ
55 products = [
56 (101, "Laptop", "Electronics", 999.99),
57 (102, "Phone", "Electronics", 599.99),
58 (103, "Tablet", "Electronics", 399.99),
59 (104, "Headphones", "Electronics", 149.99),
60 (105, "T-Shirt", "Clothing", 29.99),
61 (106, "Jeans", "Clothing", 59.99),
62 (107, "Sneakers", "Footwear", 89.99),
63 ]
64
65 products_df = spark.createDataFrame(
66 products,
67 ["product_id", "name", "category", "price"]
68 )
69
70 # ์ฃผ๋ฌธ ๋ฐ์ดํฐ ์์ฑ
71 random.seed(42)
72 orders = []
73 order_id = 1
74 base_date = datetime(2024, 1, 1)
75
76 for day in range(90): # 90์ผ์น ๋ฐ์ดํฐ
77 order_date = (base_date + timedelta(days=day)).strftime("%Y-%m-%d")
78 num_orders = random.randint(5, 15)
79
80 for _ in range(num_orders):
81 orders.append((
82 order_id,
83 random.choice([1, 2, 3, 4, 5]),
84 random.choice([101, 102, 103, 104, 105, 106, 107]),
85 random.randint(1, 5),
86 round(random.uniform(50, 500), 2),
87 random.choice(["completed", "completed", "completed", "pending", "cancelled"]),
88 order_date
89 ))
90 order_id += 1
91
92 orders_df = spark.createDataFrame(
93 orders,
94 ["order_id", "customer_id", "product_id", "quantity", "amount", "status", "order_date"]
95 )
96 orders_df = orders_df.withColumn("order_date", to_date(col("order_date")))
97
98 return customers_df, products_df, orders_df
99
100
101def basic_transformations(orders_df):
102 """๊ธฐ๋ณธ ๋ณํ ์์
"""
103 print("\n" + "=" * 60)
104 print("Basic Transformations")
105 print("=" * 60)
106
107 # ํํฐ๋ง
108 completed_orders = orders_df.filter(col("status") == "completed")
109 print(f"\nCompleted orders: {completed_orders.count()}")
110
111 # ์ ์ปฌ๋ผ ์ถ๊ฐ
112 enhanced_df = orders_df.withColumn(
113 "order_tier",
114 when(col("amount") > 300, "high")
115 .when(col("amount") > 150, "medium")
116 .otherwise("low")
117 ).withColumn(
118 "order_month", month(col("order_date"))
119 ).withColumn(
120 "order_year", year(col("order_date"))
121 )
122
123 print("\nEnhanced DataFrame:")
124 enhanced_df.select("order_id", "amount", "order_tier", "order_date", "order_month").show(5)
125
126 return enhanced_df
127
128
129def aggregations(orders_df, customers_df):
130 """์ง๊ณ ์ฐ์ฐ"""
131 print("\n" + "=" * 60)
132 print("Aggregations")
133 print("=" * 60)
134
135 # ์ ์ฒด ํต๊ณ
136 print("\nOverall Statistics:")
137 orders_df.filter(col("status") == "completed").agg(
138 count("*").alias("total_orders"),
139 _sum("amount").alias("total_revenue"),
140 avg("amount").alias("avg_order_value"),
141 countDistinct("customer_id").alias("unique_customers")
142 ).show()
143
144 # ์๋ณ ํต๊ณ
145 print("\nMonthly Statistics:")
146 monthly_stats = orders_df \
147 .filter(col("status") == "completed") \
148 .withColumn("month", month(col("order_date"))) \
149 .groupBy("month") \
150 .agg(
151 count("*").alias("orders"),
152 _sum("amount").alias("revenue"),
153 avg("amount").alias("avg_value")
154 ) \
155 .orderBy("month")
156
157 monthly_stats.show()
158
159 # ๊ณ ๊ฐ ์ธ๊ทธ๋จผํธ๋ณ ํต๊ณ
160 print("\nCustomer Segment Statistics:")
161 segment_stats = orders_df \
162 .filter(col("status") == "completed") \
163 .join(customers_df, "customer_id") \
164 .groupBy("segment") \
165 .agg(
166 count("*").alias("orders"),
167 _sum("amount").alias("revenue"),
168 countDistinct("customer_id").alias("customers")
169 ) \
170 .withColumn("revenue_per_customer", col("revenue") / col("customers")) \
171 .orderBy(col("revenue").desc())
172
173 segment_stats.show()
174
175 return monthly_stats
176
177
178def window_functions(orders_df, customers_df):
179 """์๋์ฐ ํจ์"""
180 print("\n" + "=" * 60)
181 print("Window Functions")
182 print("=" * 60)
183
184 completed = orders_df.filter(col("status") == "completed")
185
186 # ๊ณ ๊ฐ๋ณ ์๋์ฐ
187 customer_window = Window.partitionBy("customer_id").orderBy("order_date")
188
189 # ๊ณ ๊ฐ๋ณ ๋์ ๊ตฌ๋งค์ก, ์ฃผ๋ฌธ ์์
190 customer_analysis = completed \
191 .withColumn("order_rank", row_number().over(customer_window)) \
192 .withColumn("cumulative_amount", _sum("amount").over(customer_window)) \
193 .withColumn("prev_order_amount", lag("amount", 1).over(customer_window)) \
194 .withColumn("next_order_amount", lead("amount", 1).over(customer_window))
195
196 print("\nCustomer Order Analysis:")
197 customer_analysis \
198 .filter(col("customer_id") == 1) \
199 .select(
200 "customer_id", "order_date", "amount",
201 "order_rank", "cumulative_amount",
202 "prev_order_amount", "next_order_amount"
203 ) \
204 .orderBy("order_date") \
205 .show(10)
206
207 # ์ผ๋ณ ๋งค์ถ ์์
208 daily_window = Window.orderBy(col("daily_revenue").desc())
209
210 daily_ranking = completed \
211 .groupBy("order_date") \
212 .agg(_sum("amount").alias("daily_revenue")) \
213 .withColumn("rank", rank().over(daily_window))
214
215 print("\nTop 5 Revenue Days:")
216 daily_ranking.filter(col("rank") <= 5).show()
217
218 return customer_analysis
219
220
221def join_operations(customers_df, products_df, orders_df):
222 """์กฐ์ธ ์ฐ์ฐ"""
223 print("\n" + "=" * 60)
224 print("Join Operations")
225 print("=" * 60)
226
227 # ์ธ ํ
์ด๋ธ ์กฐ์ธ
228 full_data = orders_df \
229 .join(customers_df, "customer_id", "left") \
230 .join(products_df, "product_id", "left")
231
232 print("\nJoined Data Sample:")
233 full_data.select(
234 "order_id", "name", "segment", "category", "amount", "status"
235 ).show(5)
236
237 # ์นดํ
๊ณ ๋ฆฌ๋ณ, ์ธ๊ทธ๋จผํธ๋ณ ๋งค์ถ
238 category_segment = full_data \
239 .filter(col("status") == "completed") \
240 .groupBy("category", "segment") \
241 .agg(
242 count("*").alias("orders"),
243 _sum("amount").alias("revenue")
244 ) \
245 .orderBy("category", "segment")
246
247 print("\nRevenue by Category and Segment:")
248 category_segment.show()
249
250 return full_data
251
252
253def user_defined_functions(orders_df):
254 """UDF ์ฌ์ฉ"""
255 print("\n" + "=" * 60)
256 print("User Defined Functions")
257 print("=" * 60)
258
259 # Python UDF
260 @udf(returnType=StringType())
261 def categorize_amount(amount):
262 if amount is None:
263 return "unknown"
264 elif amount > 400:
265 return "premium"
266 elif amount > 200:
267 return "standard"
268 else:
269 return "economy"
270
271 # UDF ์ ์ฉ
272 with_category = orders_df.withColumn(
273 "amount_category",
274 categorize_amount(col("amount"))
275 )
276
277 print("\nWith Amount Category:")
278 with_category.select("order_id", "amount", "amount_category").show(5)
279
280 # ์นดํ
๊ณ ๋ฆฌ๋ณ ํต๊ณ
281 print("\nStatistics by Amount Category:")
282 with_category \
283 .filter(col("status") == "completed") \
284 .groupBy("amount_category") \
285 .agg(
286 count("*").alias("count"),
287 avg("amount").alias("avg_amount")
288 ) \
289 .orderBy(col("avg_amount").desc()) \
290 .show()
291
292 return with_category
293
294
295def save_results(spark, df, output_path):
296 """๊ฒฐ๊ณผ ์ ์ฅ"""
297 print("\n" + "=" * 60)
298 print("Saving Results")
299 print("=" * 60)
300
301 # Parquet ํ์์ผ๋ก ์ ์ฅ (ํํฐ์
ํฌํจ)
302 df.write \
303 .mode("overwrite") \
304 .partitionBy("status") \
305 .parquet(f"{output_path}/orders_parquet")
306
307 print(f"Saved to {output_path}/orders_parquet")
308
309 # Delta Lake ํ์ (์ค์ ๋ ๊ฒฝ์ฐ)
310 # df.write.format("delta").mode("overwrite").save(f"{output_path}/orders_delta")
311
312
313def main():
314 # SparkSession ์์ฑ
315 spark = SparkSession.builder \
316 .appName("Data Processing Example") \
317 .master("local[*]") \
318 .config("spark.sql.adaptive.enabled", "true") \
319 .config("spark.sql.shuffle.partitions", "10") \
320 .getOrCreate()
321
322 spark.sparkContext.setLogLevel("WARN")
323
324 # ์ํ ๋ฐ์ดํฐ ์์ฑ
325 customers_df, products_df, orders_df = create_sample_data(spark)
326
327 print("=" * 60)
328 print("Data Overview")
329 print("=" * 60)
330 print(f"Customers: {customers_df.count()} rows")
331 print(f"Products: {products_df.count()} rows")
332 print(f"Orders: {orders_df.count()} rows")
333
334 print("\nCustomers Schema:")
335 customers_df.printSchema()
336
337 print("\nOrders Sample:")
338 orders_df.show(5)
339
340 # ๋ณํ ์์
์คํ
341 enhanced_orders = basic_transformations(orders_df)
342 monthly_stats = aggregations(orders_df, customers_df)
343 customer_analysis = window_functions(orders_df, customers_df)
344 full_data = join_operations(customers_df, products_df, orders_df)
345 categorized = user_defined_functions(orders_df)
346
347 # ๊ฒฐ๊ณผ ์ ์ฅ
348 save_results(spark, enhanced_orders, "/tmp/spark_output")
349
350 # SparkSession ์ข
๋ฃ
351 spark.stop()
352
353
354if __name__ == "__main__":
355 main()