1"""
2PySpark Word Count ์์
3
4Spark์ ๊ธฐ๋ณธ ๋์์ ๋ณด์ฌ์ฃผ๋ ํด๋์ํ Word Count ์์ ์
๋๋ค.
5
6์คํ:
7 spark-submit word_count.py
8 ๋๋
9 python word_count.py (๋ก์ปฌ ๋ชจ๋)
10"""
11
12from pyspark.sql import SparkSession
13from pyspark.sql.functions import explode, split, lower, regexp_replace, col, desc
14
15
16def word_count_rdd(spark, text_data):
17 """RDD API๋ฅผ ์ฌ์ฉํ Word Count"""
18 print("=" * 50)
19 print("RDD API Word Count")
20 print("=" * 50)
21
22 sc = spark.sparkContext
23
24 # RDD ์์ฑ
25 lines_rdd = sc.parallelize(text_data)
26
27 # Word Count ์ฒ๋ฆฌ
28 word_counts = (
29 lines_rdd
30 .flatMap(lambda line: line.lower().split()) # ๋จ์ด๋ก ๋ถ๋ฆฌ
31 .map(lambda word: (word, 1)) # (word, 1) ์ ์์ฑ
32 .reduceByKey(lambda a, b: a + b) # ๋จ์ด๋ณ ํฉ๊ณ
33 .sortBy(lambda x: x[1], ascending=False) # ๋น๋์ ์ ๋ ฌ
34 )
35
36 # ๊ฒฐ๊ณผ ์ถ๋ ฅ
37 print("\nTop 10 words (RDD):")
38 for word, count in word_counts.take(10):
39 print(f" {word}: {count}")
40
41 return word_counts
42
43
44def word_count_df(spark, text_data):
45 """DataFrame API๋ฅผ ์ฌ์ฉํ Word Count"""
46 print("\n" + "=" * 50)
47 print("DataFrame API Word Count")
48 print("=" * 50)
49
50 # DataFrame ์์ฑ
51 df = spark.createDataFrame([(line,) for line in text_data], ["line"])
52
53 # Word Count ์ฒ๋ฆฌ
54 word_counts = (
55 df
56 .select(explode(split(lower(col("line")), r"\s+")).alias("word"))
57 # ํน์๋ฌธ์ ์ ๊ฑฐ
58 .withColumn("word", regexp_replace(col("word"), r"[^a-z0-9]", ""))
59 # ๋น ๋ฌธ์์ด ์ ์ธ
60 .filter(col("word") != "")
61 # ๊ทธ๋ฃนํ ๋ฐ ์นด์ดํธ
62 .groupBy("word")
63 .count()
64 # ์ ๋ ฌ
65 .orderBy(desc("count"))
66 )
67
68 print("\nTop 10 words (DataFrame):")
69 word_counts.show(10, truncate=False)
70
71 return word_counts
72
73
74def word_count_sql(spark, text_data):
75 """Spark SQL์ ์ฌ์ฉํ Word Count"""
76 print("\n" + "=" * 50)
77 print("Spark SQL Word Count")
78 print("=" * 50)
79
80 # DataFrame ์์ฑ ๋ฐ ๋ทฐ ๋ฑ๋ก
81 df = spark.createDataFrame([(line,) for line in text_data], ["line"])
82 df.createOrReplaceTempView("lines")
83
84 # SQL๋ก Word Count
85 word_counts = spark.sql("""
86 WITH words AS (
87 SELECT explode(split(lower(line), '\\\\s+')) AS word
88 FROM lines
89 ),
90 cleaned_words AS (
91 SELECT regexp_replace(word, '[^a-z0-9]', '') AS word
92 FROM words
93 WHERE word != ''
94 )
95 SELECT
96 word,
97 COUNT(*) AS count
98 FROM cleaned_words
99 WHERE word != ''
100 GROUP BY word
101 ORDER BY count DESC
102 """)
103
104 print("\nTop 10 words (SQL):")
105 word_counts.show(10, truncate=False)
106
107 return word_counts
108
109
110def main():
111 # SparkSession ์์ฑ
112 spark = SparkSession.builder \
113 .appName("Word Count Example") \
114 .master("local[*]") \
115 .config("spark.sql.adaptive.enabled", "true") \
116 .getOrCreate()
117
118 # ๋ก๊ทธ ๋ ๋ฒจ ์ค์
119 spark.sparkContext.setLogLevel("WARN")
120
121 # ์ํ ํ
์คํธ ๋ฐ์ดํฐ
122 text_data = [
123 "Apache Spark is a unified analytics engine for large-scale data processing",
124 "Spark provides high-level APIs in Java, Scala, Python and R",
125 "Spark also supports a rich set of higher-level tools",
126 "Spark SQL for structured data processing",
127 "MLlib for machine learning",
128 "GraphX for graph processing",
129 "Spark Streaming for real-time data processing",
130 "Spark is designed to be fast and general-purpose",
131 "Spark extends the popular MapReduce model",
132 "Spark supports in-memory computing which can improve performance",
133 "Data processing with Spark is efficient and scalable",
134 "Spark can process data from various sources like HDFS, S3, Kafka",
135 ]
136
137 print("Sample Data:")
138 for line in text_data[:3]:
139 print(f" {line}")
140 print(f" ... (total {len(text_data)} lines)")
141
142 # ์ธ ๊ฐ์ง ๋ฐฉ์์ผ๋ก Word Count ์คํ
143 word_count_rdd(spark, text_data)
144 word_count_df(spark, text_data)
145 word_count_sql(spark, text_data)
146
147 # ๊ฒฐ๊ณผ ์ ์ฅ ์์
148 print("\n" + "=" * 50)
149 print("Saving Results")
150 print("=" * 50)
151
152 output_path = "/tmp/word_count_output"
153 word_counts = word_count_df(spark, text_data)
154
155 # Parquet ํ์์ผ๋ก ์ ์ฅ
156 word_counts.write.mode("overwrite").parquet(f"{output_path}/parquet")
157 print(f"Saved to {output_path}/parquet")
158
159 # CSV ํ์์ผ๋ก ์ ์ฅ (๋จ์ผ ํ์ผ)
160 word_counts.coalesce(1).write.mode("overwrite").csv(
161 f"{output_path}/csv",
162 header=True
163 )
164 print(f"Saved to {output_path}/csv")
165
166 # ํต๊ณ ์ถ๋ ฅ
167 print("\n" + "=" * 50)
168 print("Statistics")
169 print("=" * 50)
170 print(f"Total unique words: {word_counts.count()}")
171 print(f"Total word occurrences: {word_counts.agg({'count': 'sum'}).collect()[0][0]}")
172
173 # SparkSession ์ข
๋ฃ
174 spark.stop()
175
176
177if __name__ == "__main__":
178 main()