Scala、Python、SQL で Apache Spark DataFrames と Spark SQL を使用した集計をマスターする
- Claude Paugh

- 7月24日
- 読了時間: 5分
更新日:8月18日
ビッグデータの力を最大限に活用したいなら、Apache Spark が頼りになるフレームワークです。堅牢な API と充実したエコシステムを備え、大規模データセットの処理に最適です。特に、DataFrame と Spark SQL を用いた集計機能は、Spark を非常に貴重なツールにしています。この記事では、Scala と Python の両方を用いて、Spark DataFrame と Spark SQL を用いた集計の実行方法を解説します。実践的なコード例も紹介し、理解を深めます。
Spark DataFramesを理解する
Spark DataFramesは、リレーショナルデータベースのテーブルに似た、名前付き列に整理された分散データコレクションです。この構造により、効率的なデータ操作が可能になります。DataFramesは、クエリ最適化のためのCatalystやメモリ管理のためのTungstenエンジンなどの機能により、操作のパフォーマンスを向上させます。
DataFramesの構文はシンプルで直感的であり、関数型演算やSQLライクな演算をシームレスに実行できます。例えば、複雑な定型コードを必要とせずに計算を実行できるため、初心者から経験豊富な開発者まで、誰にとっても使いやすくなります。
Spark環境の設定
集計に進む前に、以降の例に必要なSpark環境を構築しましょう。Apache Sparkをインストールし、OSとワークスペースの設定に応じてScalaとPython用に設定する必要があります。
Scalaの環境設定
Scalaの場合は、まずScala Build Tool(SBT)をインストールします。以下は`build.sbt`ファイルの簡単な設定です。
name := "SparkDataFrameExample"
version := "0.1"
scalaVersion := "2.12.10"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.1.2"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.1.2"Pythonの環境設定
Pythonユーザーにとって、PySparkがインストールされていることが必須条件です。pipを使えば簡単にインストールできます。
```bash
pip install pyspark
```環境がセットアップされたら、集計操作を検討する準備が整います。
Spark DataFrames での集計
集計はデータ分析において重要な要素であり、複雑なデータパターンを要約して理解するのに役立ちます。Spark は、`groupBy`、`agg`、そして様々な集計関数など、様々な集計操作のための強力な機能を提供します。
集約にScalaを使用する
Scala を使用した実用的な集計例として、販売データセットを考えてみましょう。
case class Sales(transactionId: Int, product: String, amount: Double, quantity: Int)
val salesData = Seq(
Sales(1, "Widget", 20.0, 5),
Sales(2, "Gadget", 30.0, 8),
Sales(3, "Widget", 20.0, 3),
Sales(4, "Gadget", 30.0, 10)
)
val salesDf = salesData.toDF()ここで、このデータセットに対していくつかの集計を実行してみましょう。
// Total Sales Amount
val totalSales = salesDf.agg(sum("amount").as("total_amount"))
totalSales.show()
// Average Quantity
val averageQuantity = salesDf.agg(avg("quantity").as("average_quantity"))
averageQuantity.show()
// Grouped Aggregation by Product
val salesByProduct = salesDf
.groupBy("product")
.agg(sum("amount").as("total_sales"), avg("quantity").as("average_quantity"))
salesByProduct.show()集計にPythonを使用する
Python では、同じ販売データセットに PySpark を使用すると次のようになります。
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, avg
spark = SparkSession.builder.appName("SalesAggregation").getOrCreate()
data = [
(1, "Widget", 20.0, 5),
(2, "Gadget", 30.0, 8),
(3, "Widget", 20.0, 3),
(4, "Gadget", 30.0, 10)
]
columns = ["transactionId", "product", "amount", "quantity"]
sales_df = spark.createDataFrame(data, columns)総売上高
total_sales = sales_df.agg(sum("amount").alias("total_amount"))
total_sales.show()平均数量
average_quantity = sales_df.agg(avg("quantity").alias("average_quantity"))
average_quantity.show()製品別のグループ化された集計
sales_by_product = sales_df.groupBy("product").agg(
sum("amount").alias("total_sales"),
avg("quantity").alias("average_quantity")
)
sales_by_product.show()
集計に Spark SQL を使用する
DataFrames の使用に加えて、Spark SQL では DataFrames で直接 SQL クエリを実行できるため、SQL の使いやすさと Spark のパフォーマンスが融合します。
まず、Python を使用して一時ビューを作成します。
sales_df.createOrReplaceTempView("sales")次に、次のような SQL コマンドを実行します。
-- SQL query for total sales amount
SELECT SUM(amount) AS total_amount FROM sales;
-- SQL query for average quantity
SELECT AVG(quantity) AS average_quantity FROM sales;
-- SQL query for aggregated results by product
SELECT product, SUM(amount) AS total_sales, AVG(quantity) AS average_quantity
FROM sales
GROUP BY product;集約の実例
理論的な知識を身に付けた上で、実際の例を詳しく調べて集計についての理解を深めましょう。
販売された個別の製品を数える
異なる値をカウントすることは、様々な分析において非常に重要です。ScalaとPythonでそれを実現する方法をご紹介します。
スカラ
val distinctProductsCount = salesDf.select("product").distinct().count()
println(s"Distinct products sold: $distinctProductsCount")パイソン
distinct_products_count = sales_df.select("product").distinct().count()
print(f"Distinct products sold: {distinct_products_count}")1日あたりの総売上高の計算
日々の売上動向を調べたいとします。まず、`sales` DataFrame に日付情報を追加します。
データを準備する
Python を使用して例に日付列を追加します。
data_with_dates = [
(1, "Widget", 20.0, 5, "2023-10-01"),
(2, "Gadget", 30.0, 8, "2023-10-01"),
(3, "Widget", 20.0, 3, "2023-10-02"),
(4, "Gadget", 30.0, 10, "2023-10-02")
]
columns_with_dates = ["transactionId", "product", "amount", "quantity", "date"]
sales_df_with_dates = spark.createDataFrame(data_with_dates, columns_with_dates)集計例
日付別に売上合計を合計するコードは、Scala と Python の両方で似ています。
スカラ
val dailySales = salesDfWithDates
.groupBy("date")
.agg(sum("amount").as("total_sales"))
dailySales.show()
パイソン
daily_sales = sales_df_with_dates.groupBy("date").agg(sum("amount").alias("total_sales"))
daily_sales.show()最適化手法
集計に Spark を使用する際のパフォーマンスを最大化するには、次の最適化手法を検討してください。
パーティショニングを使用する: 集計に含まれるデータの量を最小限に抑え、プロセスを高速化するため、大規模なデータセットには効果的です。
中間データフレームのキャッシュ: キャッシュにより、不要な再計算が削減され、同じデータセットで複数の操作を実行するときのパフォーマンスが向上します。
ブロードキャスト結合を活用する: 1 つの DataFrame が大幅に小さい場合、ブロードキャストすることで大規模なデータセットのシャッフルを防ぎ、速度を向上させることができます。
洞察のまとめ
Apache Spark DataFramesとSpark SQLを用いた集計を理解し、使いこなすことで、ビッグデータ分析の取り組みを大幅に強化できます。ここで紹介した知識と実践的な例を活用することで、高度なデータ処理を実行し、データセットから貴重な洞察を引き出すためのツールが手に入ります。実験を続け、より深い洞察を発見し、分析能力を向上させましょう。

