data_processing.py

Download
python 356 lines 10.3 KB
  1"""
  2PySpark DataFrame ์ฒ˜๋ฆฌ ์˜ˆ์ œ
  3
  4์‹ค์ œ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ์‹œ๋‚˜๋ฆฌ์˜ค๋ฅผ ๋ณด์—ฌ์ฃผ๋Š” ์˜ˆ์ œ์ž…๋‹ˆ๋‹ค:
  5- ๋ฐ์ดํ„ฐ ์ƒ์„ฑ ๋ฐ ๋กœ๋”ฉ
  6- ๋ณ€ํ™˜ (ํ•„ํ„ฐ, ์ง‘๊ณ„, ์กฐ์ธ)
  7- ์œˆ๋„์šฐ ํ•จ์ˆ˜
  8- UDF (User Defined Functions)
  9
 10์‹คํ–‰:
 11  spark-submit data_processing.py
 12"""
 13
 14from pyspark.sql import SparkSession
 15from pyspark.sql.functions import (
 16    col, lit, when, sum as _sum, avg, count, countDistinct,
 17    to_date, year, month, dayofweek,
 18    row_number, rank, lag, lead,
 19    udf, pandas_udf
 20)
 21from pyspark.sql.window import Window
 22from pyspark.sql.types import (
 23    StructType, StructField, StringType, IntegerType,
 24    DoubleType, DateType, TimestampType
 25)
 26from datetime import datetime, timedelta
 27import random
 28
 29
 30def create_sample_data(spark):
 31    """์ƒ˜ํ”Œ ๋ฐ์ดํ„ฐ ์ƒ์„ฑ"""
 32    print("Creating sample data...")
 33
 34    # ๊ณ ๊ฐ ๋ฐ์ดํ„ฐ
 35    customers = [
 36        (1, "Alice", "Gold", "New York", "2020-01-15"),
 37        (2, "Bob", "Silver", "Los Angeles", "2020-03-20"),
 38        (3, "Charlie", "Bronze", "Chicago", "2021-06-10"),
 39        (4, "Diana", "Gold", "Houston", "2019-11-05"),
 40        (5, "Eve", "Silver", "Phoenix", "2022-02-28"),
 41    ]
 42
 43    customers_schema = StructType([
 44        StructField("customer_id", IntegerType(), False),
 45        StructField("name", StringType(), False),
 46        StructField("segment", StringType(), True),
 47        StructField("city", StringType(), True),
 48        StructField("join_date", StringType(), True),
 49    ])
 50
 51    customers_df = spark.createDataFrame(customers, customers_schema)
 52    customers_df = customers_df.withColumn("join_date", to_date(col("join_date")))
 53
 54    # ์ƒํ’ˆ ๋ฐ์ดํ„ฐ
 55    products = [
 56        (101, "Laptop", "Electronics", 999.99),
 57        (102, "Phone", "Electronics", 599.99),
 58        (103, "Tablet", "Electronics", 399.99),
 59        (104, "Headphones", "Electronics", 149.99),
 60        (105, "T-Shirt", "Clothing", 29.99),
 61        (106, "Jeans", "Clothing", 59.99),
 62        (107, "Sneakers", "Footwear", 89.99),
 63    ]
 64
 65    products_df = spark.createDataFrame(
 66        products,
 67        ["product_id", "name", "category", "price"]
 68    )
 69
 70    # ์ฃผ๋ฌธ ๋ฐ์ดํ„ฐ ์ƒ์„ฑ
 71    random.seed(42)
 72    orders = []
 73    order_id = 1
 74    base_date = datetime(2024, 1, 1)
 75
 76    for day in range(90):  # 90์ผ์น˜ ๋ฐ์ดํ„ฐ
 77        order_date = (base_date + timedelta(days=day)).strftime("%Y-%m-%d")
 78        num_orders = random.randint(5, 15)
 79
 80        for _ in range(num_orders):
 81            orders.append((
 82                order_id,
 83                random.choice([1, 2, 3, 4, 5]),
 84                random.choice([101, 102, 103, 104, 105, 106, 107]),
 85                random.randint(1, 5),
 86                round(random.uniform(50, 500), 2),
 87                random.choice(["completed", "completed", "completed", "pending", "cancelled"]),
 88                order_date
 89            ))
 90            order_id += 1
 91
 92    orders_df = spark.createDataFrame(
 93        orders,
 94        ["order_id", "customer_id", "product_id", "quantity", "amount", "status", "order_date"]
 95    )
 96    orders_df = orders_df.withColumn("order_date", to_date(col("order_date")))
 97
 98    return customers_df, products_df, orders_df
 99
100
101def basic_transformations(orders_df):
102    """๊ธฐ๋ณธ ๋ณ€ํ™˜ ์ž‘์—…"""
103    print("\n" + "=" * 60)
104    print("Basic Transformations")
105    print("=" * 60)
106
107    # ํ•„ํ„ฐ๋ง
108    completed_orders = orders_df.filter(col("status") == "completed")
109    print(f"\nCompleted orders: {completed_orders.count()}")
110
111    # ์ƒˆ ์ปฌ๋Ÿผ ์ถ”๊ฐ€
112    enhanced_df = orders_df.withColumn(
113        "order_tier",
114        when(col("amount") > 300, "high")
115        .when(col("amount") > 150, "medium")
116        .otherwise("low")
117    ).withColumn(
118        "order_month", month(col("order_date"))
119    ).withColumn(
120        "order_year", year(col("order_date"))
121    )
122
123    print("\nEnhanced DataFrame:")
124    enhanced_df.select("order_id", "amount", "order_tier", "order_date", "order_month").show(5)
125
126    return enhanced_df
127
128
129def aggregations(orders_df, customers_df):
130    """์ง‘๊ณ„ ์—ฐ์‚ฐ"""
131    print("\n" + "=" * 60)
132    print("Aggregations")
133    print("=" * 60)
134
135    # ์ „์ฒด ํ†ต๊ณ„
136    print("\nOverall Statistics:")
137    orders_df.filter(col("status") == "completed").agg(
138        count("*").alias("total_orders"),
139        _sum("amount").alias("total_revenue"),
140        avg("amount").alias("avg_order_value"),
141        countDistinct("customer_id").alias("unique_customers")
142    ).show()
143
144    # ์›”๋ณ„ ํ†ต๊ณ„
145    print("\nMonthly Statistics:")
146    monthly_stats = orders_df \
147        .filter(col("status") == "completed") \
148        .withColumn("month", month(col("order_date"))) \
149        .groupBy("month") \
150        .agg(
151            count("*").alias("orders"),
152            _sum("amount").alias("revenue"),
153            avg("amount").alias("avg_value")
154        ) \
155        .orderBy("month")
156
157    monthly_stats.show()
158
159    # ๊ณ ๊ฐ ์„ธ๊ทธ๋จผํŠธ๋ณ„ ํ†ต๊ณ„
160    print("\nCustomer Segment Statistics:")
161    segment_stats = orders_df \
162        .filter(col("status") == "completed") \
163        .join(customers_df, "customer_id") \
164        .groupBy("segment") \
165        .agg(
166            count("*").alias("orders"),
167            _sum("amount").alias("revenue"),
168            countDistinct("customer_id").alias("customers")
169        ) \
170        .withColumn("revenue_per_customer", col("revenue") / col("customers")) \
171        .orderBy(col("revenue").desc())
172
173    segment_stats.show()
174
175    return monthly_stats
176
177
178def window_functions(orders_df, customers_df):
179    """์œˆ๋„์šฐ ํ•จ์ˆ˜"""
180    print("\n" + "=" * 60)
181    print("Window Functions")
182    print("=" * 60)
183
184    completed = orders_df.filter(col("status") == "completed")
185
186    # ๊ณ ๊ฐ๋ณ„ ์œˆ๋„์šฐ
187    customer_window = Window.partitionBy("customer_id").orderBy("order_date")
188
189    # ๊ณ ๊ฐ๋ณ„ ๋ˆ„์  ๊ตฌ๋งค์•ก, ์ฃผ๋ฌธ ์ˆœ์„œ
190    customer_analysis = completed \
191        .withColumn("order_rank", row_number().over(customer_window)) \
192        .withColumn("cumulative_amount", _sum("amount").over(customer_window)) \
193        .withColumn("prev_order_amount", lag("amount", 1).over(customer_window)) \
194        .withColumn("next_order_amount", lead("amount", 1).over(customer_window))
195
196    print("\nCustomer Order Analysis:")
197    customer_analysis \
198        .filter(col("customer_id") == 1) \
199        .select(
200            "customer_id", "order_date", "amount",
201            "order_rank", "cumulative_amount",
202            "prev_order_amount", "next_order_amount"
203        ) \
204        .orderBy("order_date") \
205        .show(10)
206
207    # ์ผ๋ณ„ ๋งค์ถœ ์ˆœ์œ„
208    daily_window = Window.orderBy(col("daily_revenue").desc())
209
210    daily_ranking = completed \
211        .groupBy("order_date") \
212        .agg(_sum("amount").alias("daily_revenue")) \
213        .withColumn("rank", rank().over(daily_window))
214
215    print("\nTop 5 Revenue Days:")
216    daily_ranking.filter(col("rank") <= 5).show()
217
218    return customer_analysis
219
220
221def join_operations(customers_df, products_df, orders_df):
222    """์กฐ์ธ ์—ฐ์‚ฐ"""
223    print("\n" + "=" * 60)
224    print("Join Operations")
225    print("=" * 60)
226
227    # ์„ธ ํ…Œ์ด๋ธ” ์กฐ์ธ
228    full_data = orders_df \
229        .join(customers_df, "customer_id", "left") \
230        .join(products_df, "product_id", "left")
231
232    print("\nJoined Data Sample:")
233    full_data.select(
234        "order_id", "name", "segment", "category", "amount", "status"
235    ).show(5)
236
237    # ์นดํ…Œ๊ณ ๋ฆฌ๋ณ„, ์„ธ๊ทธ๋จผํŠธ๋ณ„ ๋งค์ถœ
238    category_segment = full_data \
239        .filter(col("status") == "completed") \
240        .groupBy("category", "segment") \
241        .agg(
242            count("*").alias("orders"),
243            _sum("amount").alias("revenue")
244        ) \
245        .orderBy("category", "segment")
246
247    print("\nRevenue by Category and Segment:")
248    category_segment.show()
249
250    return full_data
251
252
253def user_defined_functions(orders_df):
254    """UDF ์‚ฌ์šฉ"""
255    print("\n" + "=" * 60)
256    print("User Defined Functions")
257    print("=" * 60)
258
259    # Python UDF
260    @udf(returnType=StringType())
261    def categorize_amount(amount):
262        if amount is None:
263            return "unknown"
264        elif amount > 400:
265            return "premium"
266        elif amount > 200:
267            return "standard"
268        else:
269            return "economy"
270
271    # UDF ์ ์šฉ
272    with_category = orders_df.withColumn(
273        "amount_category",
274        categorize_amount(col("amount"))
275    )
276
277    print("\nWith Amount Category:")
278    with_category.select("order_id", "amount", "amount_category").show(5)
279
280    # ์นดํ…Œ๊ณ ๋ฆฌ๋ณ„ ํ†ต๊ณ„
281    print("\nStatistics by Amount Category:")
282    with_category \
283        .filter(col("status") == "completed") \
284        .groupBy("amount_category") \
285        .agg(
286            count("*").alias("count"),
287            avg("amount").alias("avg_amount")
288        ) \
289        .orderBy(col("avg_amount").desc()) \
290        .show()
291
292    return with_category
293
294
295def save_results(spark, df, output_path):
296    """๊ฒฐ๊ณผ ์ €์žฅ"""
297    print("\n" + "=" * 60)
298    print("Saving Results")
299    print("=" * 60)
300
301    # Parquet ํ˜•์‹์œผ๋กœ ์ €์žฅ (ํŒŒํ‹ฐ์…˜ ํฌํ•จ)
302    df.write \
303        .mode("overwrite") \
304        .partitionBy("status") \
305        .parquet(f"{output_path}/orders_parquet")
306
307    print(f"Saved to {output_path}/orders_parquet")
308
309    # Delta Lake ํ˜•์‹ (์„ค์ •๋œ ๊ฒฝ์šฐ)
310    # df.write.format("delta").mode("overwrite").save(f"{output_path}/orders_delta")
311
312
313def main():
314    # SparkSession ์ƒ์„ฑ
315    spark = SparkSession.builder \
316        .appName("Data Processing Example") \
317        .master("local[*]") \
318        .config("spark.sql.adaptive.enabled", "true") \
319        .config("spark.sql.shuffle.partitions", "10") \
320        .getOrCreate()
321
322    spark.sparkContext.setLogLevel("WARN")
323
324    # ์ƒ˜ํ”Œ ๋ฐ์ดํ„ฐ ์ƒ์„ฑ
325    customers_df, products_df, orders_df = create_sample_data(spark)
326
327    print("=" * 60)
328    print("Data Overview")
329    print("=" * 60)
330    print(f"Customers: {customers_df.count()} rows")
331    print(f"Products: {products_df.count()} rows")
332    print(f"Orders: {orders_df.count()} rows")
333
334    print("\nCustomers Schema:")
335    customers_df.printSchema()
336
337    print("\nOrders Sample:")
338    orders_df.show(5)
339
340    # ๋ณ€ํ™˜ ์ž‘์—… ์‹คํ–‰
341    enhanced_orders = basic_transformations(orders_df)
342    monthly_stats = aggregations(orders_df, customers_df)
343    customer_analysis = window_functions(orders_df, customers_df)
344    full_data = join_operations(customers_df, products_df, orders_df)
345    categorized = user_defined_functions(orders_df)
346
347    # ๊ฒฐ๊ณผ ์ €์žฅ
348    save_results(spark, enhanced_orders, "/tmp/spark_output")
349
350    # SparkSession ์ข…๋ฃŒ
351    spark.stop()
352
353
354if __name__ == "__main__":
355    main()