word_count.py

Download
python 179 lines 5.0 KB
  1"""
  2PySpark Word Count ์˜ˆ์ œ
  3
  4Spark์˜ ๊ธฐ๋ณธ ๋™์ž‘์„ ๋ณด์—ฌ์ฃผ๋Š” ํด๋ž˜์‹ํ•œ Word Count ์˜ˆ์ œ์ž…๋‹ˆ๋‹ค.
  5
  6์‹คํ–‰:
  7  spark-submit word_count.py
  8  ๋˜๋Š”
  9  python word_count.py (๋กœ์ปฌ ๋ชจ๋“œ)
 10"""
 11
 12from pyspark.sql import SparkSession
 13from pyspark.sql.functions import explode, split, lower, regexp_replace, col, desc
 14
 15
 16def word_count_rdd(spark, text_data):
 17    """RDD API๋ฅผ ์‚ฌ์šฉํ•œ Word Count"""
 18    print("=" * 50)
 19    print("RDD API Word Count")
 20    print("=" * 50)
 21
 22    sc = spark.sparkContext
 23
 24    # RDD ์ƒ์„ฑ
 25    lines_rdd = sc.parallelize(text_data)
 26
 27    # Word Count ์ฒ˜๋ฆฌ
 28    word_counts = (
 29        lines_rdd
 30        .flatMap(lambda line: line.lower().split())  # ๋‹จ์–ด๋กœ ๋ถ„๋ฆฌ
 31        .map(lambda word: (word, 1))                  # (word, 1) ์Œ ์ƒ์„ฑ
 32        .reduceByKey(lambda a, b: a + b)              # ๋‹จ์–ด๋ณ„ ํ•ฉ๊ณ„
 33        .sortBy(lambda x: x[1], ascending=False)      # ๋นˆ๋„์ˆœ ์ •๋ ฌ
 34    )
 35
 36    # ๊ฒฐ๊ณผ ์ถœ๋ ฅ
 37    print("\nTop 10 words (RDD):")
 38    for word, count in word_counts.take(10):
 39        print(f"  {word}: {count}")
 40
 41    return word_counts
 42
 43
 44def word_count_df(spark, text_data):
 45    """DataFrame API๋ฅผ ์‚ฌ์šฉํ•œ Word Count"""
 46    print("\n" + "=" * 50)
 47    print("DataFrame API Word Count")
 48    print("=" * 50)
 49
 50    # DataFrame ์ƒ์„ฑ
 51    df = spark.createDataFrame([(line,) for line in text_data], ["line"])
 52
 53    # Word Count ์ฒ˜๋ฆฌ
 54    word_counts = (
 55        df
 56        .select(explode(split(lower(col("line")), r"\s+")).alias("word"))
 57        # ํŠน์ˆ˜๋ฌธ์ž ์ œ๊ฑฐ
 58        .withColumn("word", regexp_replace(col("word"), r"[^a-z0-9]", ""))
 59        # ๋นˆ ๋ฌธ์ž์—ด ์ œ์™ธ
 60        .filter(col("word") != "")
 61        # ๊ทธ๋ฃนํ™” ๋ฐ ์นด์šดํŠธ
 62        .groupBy("word")
 63        .count()
 64        # ์ •๋ ฌ
 65        .orderBy(desc("count"))
 66    )
 67
 68    print("\nTop 10 words (DataFrame):")
 69    word_counts.show(10, truncate=False)
 70
 71    return word_counts
 72
 73
 74def word_count_sql(spark, text_data):
 75    """Spark SQL์„ ์‚ฌ์šฉํ•œ Word Count"""
 76    print("\n" + "=" * 50)
 77    print("Spark SQL Word Count")
 78    print("=" * 50)
 79
 80    # DataFrame ์ƒ์„ฑ ๋ฐ ๋ทฐ ๋“ฑ๋ก
 81    df = spark.createDataFrame([(line,) for line in text_data], ["line"])
 82    df.createOrReplaceTempView("lines")
 83
 84    # SQL๋กœ Word Count
 85    word_counts = spark.sql("""
 86        WITH words AS (
 87            SELECT explode(split(lower(line), '\\\\s+')) AS word
 88            FROM lines
 89        ),
 90        cleaned_words AS (
 91            SELECT regexp_replace(word, '[^a-z0-9]', '') AS word
 92            FROM words
 93            WHERE word != ''
 94        )
 95        SELECT
 96            word,
 97            COUNT(*) AS count
 98        FROM cleaned_words
 99        WHERE word != ''
100        GROUP BY word
101        ORDER BY count DESC
102    """)
103
104    print("\nTop 10 words (SQL):")
105    word_counts.show(10, truncate=False)
106
107    return word_counts
108
109
110def main():
111    # SparkSession ์ƒ์„ฑ
112    spark = SparkSession.builder \
113        .appName("Word Count Example") \
114        .master("local[*]") \
115        .config("spark.sql.adaptive.enabled", "true") \
116        .getOrCreate()
117
118    # ๋กœ๊ทธ ๋ ˆ๋ฒจ ์„ค์ •
119    spark.sparkContext.setLogLevel("WARN")
120
121    # ์ƒ˜ํ”Œ ํ…์ŠคํŠธ ๋ฐ์ดํ„ฐ
122    text_data = [
123        "Apache Spark is a unified analytics engine for large-scale data processing",
124        "Spark provides high-level APIs in Java, Scala, Python and R",
125        "Spark also supports a rich set of higher-level tools",
126        "Spark SQL for structured data processing",
127        "MLlib for machine learning",
128        "GraphX for graph processing",
129        "Spark Streaming for real-time data processing",
130        "Spark is designed to be fast and general-purpose",
131        "Spark extends the popular MapReduce model",
132        "Spark supports in-memory computing which can improve performance",
133        "Data processing with Spark is efficient and scalable",
134        "Spark can process data from various sources like HDFS, S3, Kafka",
135    ]
136
137    print("Sample Data:")
138    for line in text_data[:3]:
139        print(f"  {line}")
140    print(f"  ... (total {len(text_data)} lines)")
141
142    # ์„ธ ๊ฐ€์ง€ ๋ฐฉ์‹์œผ๋กœ Word Count ์‹คํ–‰
143    word_count_rdd(spark, text_data)
144    word_count_df(spark, text_data)
145    word_count_sql(spark, text_data)
146
147    # ๊ฒฐ๊ณผ ์ €์žฅ ์˜ˆ์‹œ
148    print("\n" + "=" * 50)
149    print("Saving Results")
150    print("=" * 50)
151
152    output_path = "/tmp/word_count_output"
153    word_counts = word_count_df(spark, text_data)
154
155    # Parquet ํ˜•์‹์œผ๋กœ ์ €์žฅ
156    word_counts.write.mode("overwrite").parquet(f"{output_path}/parquet")
157    print(f"Saved to {output_path}/parquet")
158
159    # CSV ํ˜•์‹์œผ๋กœ ์ €์žฅ (๋‹จ์ผ ํŒŒ์ผ)
160    word_counts.coalesce(1).write.mode("overwrite").csv(
161        f"{output_path}/csv",
162        header=True
163    )
164    print(f"Saved to {output_path}/csv")
165
166    # ํ†ต๊ณ„ ์ถœ๋ ฅ
167    print("\n" + "=" * 50)
168    print("Statistics")
169    print("=" * 50)
170    print(f"Total unique words: {word_counts.count()}")
171    print(f"Total word occurrences: {word_counts.agg({'count': 'sum'}).collect()[0][0]}")
172
173    # SparkSession ์ข…๋ฃŒ
174    spark.stop()
175
176
177if __name__ == "__main__":
178    main()