top of page

Scala、Python、SQL で Apache Spark DataFrames と Spark SQL を使用した集計をマスターする

更新日:8月18日

ビッグデータの力を最大限に活用したいなら、Apache Spark が頼りになるフレームワークです。堅牢な API と充実したエコシステムを備え、大規模データセットの処理に最適です。特に、DataFrame と Spark SQL を用いた集計機能は、Spark を非常に貴重なツールにしています。この記事では、Scala と Python の両方を用いて、Spark DataFrame と Spark SQL を用いた集計の実行方法を解説します。実践的なコード例も紹介し、理解を深めます。

Spark DataFramesを理解する


Spark DataFramesは、リレーショナルデータベースのテーブルに似た、名前付き列に整理された分散データコレクションです。この構造により、効率的なデータ操作が可能になります。DataFramesは、クエリ最適化のためのCatalystやメモリ管理のためのTungstenエンジンなどの機能により、操作のパフォーマンスを向上させます。


DataFramesの構文はシンプルで直感的であり、関数型演算やSQLライクな演算をシームレスに実行できます。例えば、複雑な定型コードを必要とせずに計算を実行できるため、初心者から経験豊富な開発者まで、誰にとっても使いやすくなります。


Spark環境の設定


集計に進む前に、以降の例に必要なSpark環境を構築しましょう。Apache Sparkをインストールし、OSとワークスペースの設定に応じてScalaとPython用に設定する必要があります。


Scalaの環境設定


Scalaの場合は、まずScala Build Tool(SBT)をインストールします。以下は`build.sbt`ファイルの簡単な設定です。

name := "SparkDataFrameExample"
version := "0.1"
scalaVersion := "2.12.10"

libraryDependencies += "org.apache.spark" %% "spark-core" % "3.1.2"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.1.2"

Pythonの環境設定


Pythonユーザーにとって、PySparkがインストールされていることが必須条件です。pipを使えば簡単にインストールできます。

```bash
pip install pyspark
```

環境がセットアップされたら、集計操作を検討する準備が整います。


Spark DataFrames での集計


集計はデータ分析において重要な要素であり、複雑なデータパターンを要約して理解するのに役立ちます。Spark は、`groupBy`、`agg`、そして様々な集計関数など、様々な集計操作のための強力な機能を提供します。


集約にScalaを使用する


Scala を使用した実用的な集計例として、販売データセットを考えてみましょう。

case class Sales(transactionId: Int, product: String, amount: Double, quantity: Int)

val salesData = Seq(
  Sales(1, "Widget", 20.0, 5),
  Sales(2, "Gadget", 30.0, 8),
  Sales(3, "Widget", 20.0, 3),
  Sales(4, "Gadget", 30.0, 10)
)

val salesDf = salesData.toDF()

ここで、このデータセットに対していくつかの集計を実行してみましょう。

// Total Sales Amount
val totalSales = salesDf.agg(sum("amount").as("total_amount"))

totalSales.show()

// Average Quantity
val averageQuantity = salesDf.agg(avg("quantity").as("average_quantity"))

averageQuantity.show()

// Grouped Aggregation by Product
val salesByProduct = salesDf
  .groupBy("product")
  .agg(sum("amount").as("total_sales"), avg("quantity").as("average_quantity"))

salesByProduct.show()

集計にPythonを使用する


Python では、同じ販売データセットに PySpark を使用すると次のようになります。


from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, avg

spark = SparkSession.builder.appName("SalesAggregation").getOrCreate()
data = [
    (1, "Widget", 20.0, 5),
    (2, "Gadget", 30.0, 8),
    (3, "Widget", 20.0, 3),
    (4, "Gadget", 30.0, 10)
]

columns = ["transactionId", "product", "amount", "quantity"]
sales_df = spark.createDataFrame(data, columns)

総売上高

total_sales = sales_df.agg(sum("amount").alias("total_amount"))
total_sales.show()

平均数量

average_quantity = sales_df.agg(avg("quantity").alias("average_quantity"))
average_quantity.show()

製品別のグループ化された集計

sales_by_product = sales_df.groupBy("product").agg(
sum("amount").alias("total_sales"),
    avg("quantity").alias("average_quantity")
)

sales_by_product.show()

集計に Spark SQL を使用する


DataFrames の使用に加えて、Spark SQL では DataFrames で直接 SQL クエリを実行できるため、SQL の使いやすさと Spark のパフォーマンスが融合します。


まず、Python を使用して一時ビューを作成します。

sales_df.createOrReplaceTempView("sales")

次に、次のような SQL コマンドを実行します。

-- SQL query for total sales amount
SELECT SUM(amount) AS total_amount FROM sales;

-- SQL query for average quantity
SELECT AVG(quantity) AS average_quantity FROM sales;

-- SQL query for aggregated results by product
SELECT product, SUM(amount) AS total_sales, AVG(quantity) AS average_quantity
FROM sales
GROUP BY product;

集約の実例


理論的な知識を身に付けた上で、実際の例を詳しく調べて集計についての理解を深めましょう。


販売された個別の製品を数える


異なる値をカウントすることは、様々な分析において非常に重要です。ScalaとPythonでそれを実現する方法をご紹介します。


スカラ

val distinctProductsCount = salesDf.select("product").distinct().count()
println(s"Distinct products sold: $distinctProductsCount")

パイソン

distinct_products_count = sales_df.select("product").distinct().count()
print(f"Distinct products sold: {distinct_products_count}")

1日あたりの総売上高の計算


日々の売上動向を調べたいとします。まず、`sales` DataFrame に日付情報を追加します。


データを準備する


Python を使用して例に日付列を追加します。
data_with_dates = [
    (1, "Widget", 20.0, 5, "2023-10-01"),
    (2, "Gadget", 30.0, 8, "2023-10-01"),
    (3, "Widget", 20.0, 3, "2023-10-02"),
    (4, "Gadget", 30.0, 10, "2023-10-02")
]

columns_with_dates = ["transactionId", "product", "amount", "quantity", "date"]

sales_df_with_dates = spark.createDataFrame(data_with_dates, columns_with_dates)

集計例

日付別に売上合計を合計するコードは、Scala と Python の両方で似ています。


スカラ

val dailySales = salesDfWithDates
  .groupBy("date")
  .agg(sum("amount").as("total_sales"))

dailySales.show()

パイソン

daily_sales = sales_df_with_dates.groupBy("date").agg(sum("amount").alias("total_sales"))

daily_sales.show()

最適化手法


集計に Spark を使用する際のパフォーマンスを最大化するには、次の最適化手法を検討してください。


  1. パーティショニングを使用する: 集計に含まれるデータの量を最小限に抑え、プロセスを高速化するため、大規模なデータセットには効果的です。


  2. 中間データフレームのキャッシュ: キャッシュにより、不要な再計算が削減され、同じデータセットで複数の操作を実行するときのパフォーマンスが向上します。


  3. ブロードキャスト結合を活用する: 1 つの DataFrame が大幅に小さい場合、ブロードキャストすることで大規模なデータセットのシャッフルを防ぎ、速度を向上させることができます。


洞察のまとめ


Apache Spark DataFramesとSpark SQLを用いた集計を理解し、使いこなすことで、ビッグデータ分析の取り組みを大幅に強化できます。ここで紹介した知識と実践的な例を活用することで、高度なデータ処理を実行し、データセットから貴重な洞察を引き出すためのツールが手に入ります。実験を続け、より深い洞察を発見し、分析能力を向上させましょう。


Spark のロゴが表示された最新のデータ処理セットアップの広角ビュー
Data processing with Spark DataFrames and SQL


bottom of page