top of page

Aggregationen mit Apache Spark DataFrames und Spark SQL in Scala, Python und SQL meistern

  • Autorenbild: Claude Paugh
    Claude Paugh
  • 28. Apr.
  • 4 Min. Lesezeit

Aktualisiert: 20. Mai

Wenn Sie das Potenzial von Big Data voll ausschöpfen möchten, ist Apache Spark das ideale Framework. Es bietet robuste APIs und ein umfassendes Ökosystem – ideal für die Verarbeitung großer Datensätze. Insbesondere die Fähigkeit von Spark, Aggregationen mit DataFrames und Spark SQL durchzuführen, macht es zu einem unverzichtbaren Werkzeug. Dieser Beitrag führt Sie durch die Durchführung von Aggregationen mit Spark DataFrames und Spark SQL in Scala und Python. Praktische Codebeispiele vertiefen Ihr Verständnis.


Grundlegendes zu Spark DataFrames


Spark DataFrames sind verteilte Datensammlungen, die in benannten Spalten organisiert sind, ähnlich wie Tabellen in einer relationalen Datenbank. Diese Struktur ermöglicht eine effiziente Datenmanipulation. DataFrames verbessern die Leistung von Operationen durch Funktionen wie Catalyst zur Abfrageoptimierung und die Tungsten-Engine zur Speicherverwaltung.


Die Syntax für DataFrames ist einfach und intuitiv und ermöglicht die nahtlose Ausführung funktionaler und SQL-ähnlicher Operationen. So können Sie beispielsweise Berechnungen ohne umfangreichen Boilerplate-Code durchführen, was sowohl für Einsteiger als auch für erfahrene Entwickler eine Erleichterung darstellt.


Einrichten Ihrer Spark-Umgebung


Bevor wir uns mit Aggregationen befassen, richten wir die für die folgenden Beispiele erforderliche Spark-Umgebung ein. Dazu installieren wir Apache Spark und konfigurieren es je nach Betriebssystem und Arbeitsbereichseinstellungen für Scala und Python.


Umgebungs-Setup für Scala


Für Scala installieren Sie zunächst das Scala Build Tool (SBT). Nachfolgend finden Sie eine einfache Konfiguration für Ihre Datei „build.sbt“:

name := "SparkDataFrameExample"
version := "0.1"
scalaVersion := "2.12.10"

libraryDependencies += "org.apache.spark" %% "spark-core" % "3.1.2"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.1.2"

Umgebungs-Setup für Python


Für Python-Benutzer ist die Installation von PySpark die wichtigste Voraussetzung. Sie können es einfach über pip installieren:

```bash
pip install pyspark
```

Nachdem Sie Ihre Umgebung eingerichtet haben, können Sie Aggregationsvorgänge erkunden.


Aggregationen in Spark DataFrames


Aggregation ist ein wichtiger Aspekt der Datenanalyse, der es Ihnen ermöglicht, komplexe Datenmuster zusammenzufassen und zu verstehen. Spark bietet leistungsstarke Funktionen für verschiedene Aggregationsvorgänge, darunter „groupBy“, „agg“ und eine Reihe von Aggregatfunktionen.


Verwenden von Scala für Aggregationen


Betrachten wir einen Verkaufsdatensatz für praktische Aggregationsbeispiele mit Scala:

case class Sales(transactionId: Int, product: String, amount: Double, quantity: Int)

val salesData = Seq(
  Sales(1, "Widget", 20.0, 5),
  Sales(2, "Gadget", 30.0, 8),
  Sales(3, "Widget", 20.0, 3),
  Sales(4, "Gadget", 30.0, 10)
)

val salesDf = salesData.toDF()

Lassen Sie uns nun einige Aggregationen für diesen Datensatz durchführen:

// Total Sales Amount
val totalSales = salesDf.agg(sum("amount").as("total_amount"))

totalSales.show()

// Average Quantity
val averageQuantity = salesDf.agg(avg("quantity").as("average_quantity"))

averageQuantity.show()

// Grouped Aggregation by Product
val salesByProduct = salesDf
  .groupBy("product")
  .agg(sum("amount").as("total_sales"), avg("quantity").as("average_quantity"))

salesByProduct.show()

Verwenden von Python für Aggregationen


In Python sieht die Verwendung von PySpark für denselben Verkaufsdatensatz folgendermaßen aus:


from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, avg

spark = SparkSession.builder.appName("SalesAggregation").getOrCreate()
data = [
    (1, "Widget", 20.0, 5),
    (2, "Gadget", 30.0, 8),
    (3, "Widget", 20.0, 3),
    (4, "Gadget", 30.0, 10)
]

columns = ["transactionId", "product", "amount", "quantity"]
sales_df = spark.createDataFrame(data, columns)

Gesamtverkaufsbetrag

total_sales = sales_df.agg(sum("amount").alias("total_amount"))
total_sales.show()

Durchschnittliche Menge

average_quantity = sales_df.agg(avg("quantity").alias("average_quantity"))
average_quantity.show()

Gruppierte Aggregation nach Produkt

sales_by_product = sales_df.groupBy("product").agg(
sum("amount").alias("total_sales"),
    avg("quantity").alias("average_quantity")
)

sales_by_product.show()

Verwenden von Spark SQL für Aggregationen


Zusätzlich zur Verwendung von DataFrames ermöglicht Spark SQL die Ausführung von SQL-Abfragen direkt auf DataFrames und kombiniert so die Benutzerfreundlichkeit von SQL mit der Leistung von Spark.


Beginnen Sie mit der Erstellung einer temporären Ansicht mit Python:

sales_df.createOrReplaceTempView("sales")

Führen Sie als Nächstes SQL-Befehle wie diese aus:

-- SQL query for total sales amount
SELECT SUM(amount) AS total_amount FROM sales;

-- SQL query for average quantity
SELECT AVG(quantity) AS average_quantity FROM sales;

-- SQL query for aggregated results by product
SELECT product, SUM(amount) AS total_sales, AVG(quantity) AS average_quantity
FROM sales
GROUP BY product;

Praktische Beispiele für Aggregationen


Ausgestattet mit theoretischem Wissen vertiefen wir uns nun in praktische Beispiele, um Ihr Verständnis von Aggregationen zu stärken.


Anzahl der verkauften Produkte


Das Zählen unterschiedlicher Werte ist für verschiedene Analysen entscheidend. So erreichen Sie es in Scala und Python.


Scala

val distinctProductsCount = salesDf.select("product").distinct().count()
println(s"Distinct products sold: $distinctProductsCount")

Python

distinct_products_count = sales_df.select("product").distinct().count()
print(f"Distinct products sold: {distinct_products_count}")

Berechnung des Gesamtumsatzes pro Tag


Stellen Sie sich vor, Sie möchten tägliche Verkaufstrends untersuchen. Zunächst würden Sie den DataFrame „sales“ mit Datumsinformationen ergänzen.


Vorbereiten der Daten


Hinzufügen einer Datumsspalte für das Beispiel mit Python:
data_with_dates = [
    (1, "Widget", 20.0, 5, "2023-10-01"),
    (2, "Gadget", 30.0, 8, "2023-10-01"),
    (3, "Widget", 20.0, 3, "2023-10-02"),
    (4, "Gadget", 30.0, 10, "2023-10-02")
]

columns_with_dates = ["transactionId", "product", "amount", "quantity", "date"]

sales_df_with_dates = spark.createDataFrame(data_with_dates, columns_with_dates)

Aggregationsbeispiel

Der Code zum Summieren der Gesamtverkäufe nach Datum sieht in Scala und Python ähnlich aus:


Scala
val dailySales = salesDfWithDates
  .groupBy("date")
  .agg(sum("amount").as("total_sales"))

dailySales.show()
Python
daily_sales = sales_df_with_dates.groupBy("date").agg(sum("amount").alias("total_sales"))

daily_sales.show()

Optimierungstechniken


Um die Leistung bei der Verwendung von Spark für Aggregationen zu maximieren, sollten Sie diese Optimierungstechniken in Betracht ziehen:


  1. Partitionierung verwenden : Dies ist bei großen Datensätzen von Vorteil, da dadurch die Menge der bei der Aggregation beteiligten Daten minimiert und der Prozess dadurch beschleunigt wird.


  2. Zwischenspeichern von DataFrames : Durch das Caching kann die Leistung beim Ausführen mehrerer Vorgänge auf demselben Datensatz gesteigert werden, indem unnötige Neuberechnungen reduziert werden.


  3. Nutzen Sie Broadcast-Joins : Wenn ein DataFrame deutlich kleiner ist, kann durch dessen Broadcasting das Mischen großer Datensätze verhindert und so die Geschwindigkeit verbessert werden.


Zusammenfassende Erkenntnisse


Das Verstehen und Beherrschen von Aggregationen mit Apache Spark DataFrames und Spark SQL kann Ihre Big-Data-Analyse deutlich verbessern. Mit dem bereitgestellten Wissen und den praktischen Beispielen verfügen Sie nun über die Werkzeuge für erweiterte Datenverarbeitung und die Gewinnung wertvoller Erkenntnisse aus Ihren Datensätzen. Experimentieren Sie weiter, um tiefere Einblicke zu gewinnen und Ihre Analysefähigkeiten zu verbessern!


Weitwinkelansicht eines modernen Datenverarbeitungs-Setups mit Spark-Logo
Data processing with Spark DataFrames and SQL


+1 508-203-1492

Bedford, MA 01730

bottom of page