top of page

Maîtriser les agrégations avec Apache Spark DataFrames et Spark SQL en Scala, Python et SQL

  • Photo du rédacteur: Claude Paugh
    Claude Paugh
  • 28 avr.
  • 4 min de lecture

Dernière mise à jour : 3 mai

Si vous souhaitez exploiter la puissance du Big Data, Apache Spark est le framework idéal. Il offre des API robustes et un écosystème riche, parfaits pour le traitement de grands ensembles de données. Sa capacité à effectuer des agrégations à l'aide de DataFrames et de Spark SQL en fait un outil précieux. Cet article vous guidera dans la réalisation d'agrégations avec Spark DataFrames et Spark SQL, en utilisant Scala et Python. Vous découvrirez des exemples de code pratiques pour consolider vos connaissances.


Comprendre les DataFrames Spark


Les DataFrames Spark sont des collections de données distribuées organisées en colonnes nommées, à l'instar des tables d'une base de données relationnelle. Cette structure permet une manipulation efficace des données. Les DataFrames améliorent les performances des opérations grâce à des fonctionnalités telles que Catalyst pour l'optimisation des requêtes et le moteur Tungsten pour la gestion de la mémoire.


La syntaxe des DataFrames est simple et intuitive, ce qui vous permet d'exécuter des opérations fonctionnelles et de type SQL de manière fluide. Par exemple, vous pouvez effectuer des calculs sans avoir recours à du code standard complexe, ce qui simplifie la tâche des développeurs débutants comme expérimentés.


Configuration de votre environnement Spark


Avant de nous plonger dans les agrégations, établissons l'environnement Spark nécessaire aux exemples suivants. Cela implique l'installation d'Apache Spark et sa configuration pour Scala et Python, selon votre système d'exploitation et vos préférences d'espace de travail.



Configuration de l'environnement pour Scala


Pour Scala, commencez par installer Scala Build Tool (SBT). Voici une configuration simple pour votre fichier « build.sbt » :

name := "SparkDataFrameExample"
version := "0.1"
scalaVersion := "2.12.10"

libraryDependencies += "org.apache.spark" %% "spark-core" % "3.1.2"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.1.2"

Configuration de l'environnement pour Python


Pour les utilisateurs de Python, l'installation de PySpark est essentielle. Vous pouvez l'installer facilement via pip :

```bash
pip install pyspark
```

Une fois votre environnement configuré, vous êtes prêt à explorer les opérations d’agrégation.


Agrégations dans Spark DataFrames


L'agrégation est un aspect essentiel de l'analyse de données, permettant de synthétiser et de comprendre des modèles de données complexes. Spark offre de puissantes fonctionnalités pour diverses opérations d'agrégation, notamment « groupBy », « agg » et diverses fonctions d'agrégation.


Utilisation de Scala pour les agrégations


Considérons un ensemble de données de ventes pour des exemples pratiques d'agrégation avec Scala :

case class Sales(transactionId: Int, product: String, amount: Double, quantity: Int)

val salesData = Seq(
  Sales(1, "Widget", 20.0, 5),
  Sales(2, "Gadget", 30.0, 8),
  Sales(3, "Widget", 20.0, 3),
  Sales(4, "Gadget", 30.0, 10)
)

val salesDf = salesData.toDF()

Maintenant, effectuons quelques agrégations sur cet ensemble de données :

// Total Sales Amount
val totalSales = salesDf.agg(sum("amount").as("total_amount"))

totalSales.show()

// Average Quantity
val averageQuantity = salesDf.agg(avg("quantity").as("average_quantity"))

averageQuantity.show()

// Grouped Aggregation by Product
val salesByProduct = salesDf
  .groupBy("product")
  .agg(sum("amount").as("total_sales"), avg("quantity").as("average_quantity"))

salesByProduct.show()

Utilisation de Python pour les agrégations


En Python, l'utilisation de PySpark pour le même ensemble de données de ventes ressemble à ceci :


from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, avg

spark = SparkSession.builder.appName("SalesAggregation").getOrCreate()
data = [
    (1, "Widget", 20.0, 5),
    (2, "Gadget", 30.0, 8),
    (3, "Widget", 20.0, 3),
    (4, "Gadget", 30.0, 10)
]

columns = ["transactionId", "product", "amount", "quantity"]
sales_df = spark.createDataFrame(data, columns)

Montant total des ventes

total_sales = sales_df.agg(sum("amount").alias("total_amount"))
total_sales.show()

Quantité moyenne

average_quantity = sales_df.agg(avg("quantity").alias("average_quantity"))
average_quantity.show()

Agrégation groupée par produit

sales_by_product = sales_df.groupBy("product").agg(
sum("amount").alias("total_sales"),
    avg("quantity").alias("average_quantity")
)

sales_by_product.show()

Utilisation de Spark SQL pour les agrégations


En plus d'utiliser des DataFrames, Spark SQL permet d'exécuter des requêtes SQL directement sur des DataFrames, alliant la facilité d'utilisation de SQL aux performances de Spark.


Commencez par créer une vue temporaire avec Python :

sales_df.createOrReplaceTempView("sales")

Ensuite, exécutez les commandes SQL comme celles-ci :

-- SQL query for total sales amount
SELECT SUM(amount) AS total_amount FROM sales;

-- SQL query for average quantity
SELECT AVG(quantity) AS average_quantity FROM sales;

-- SQL query for aggregated results by product
SELECT product, SUM(amount) AS total_sales, AVG(quantity) AS average_quantity
FROM sales
GROUP BY product;

Exemples pratiques d'agrégations


Munis de connaissances théoriques, plongeons-nous dans des exemples pratiques pour renforcer votre compréhension des agrégations.


Compter les produits distincts vendus


Compter des valeurs distinctes est crucial pour diverses analyses. Voici comment y parvenir en Scala et Python.


Scala

val distinctProductsCount = salesDf.select("product").distinct().count()
println(s"Distinct products sold: $distinctProductsCount")

Python

distinct_products_count = sales_df.select("product").distinct().count()
print(f"Distinct products sold: {distinct_products_count}")

Calcul des ventes totales par jour


Imaginez que vous souhaitiez analyser les tendances quotidiennes des ventes. Commencez par enrichir le DataFrame « ventes » avec des informations de date.


Préparez les données


Ajout d'une colonne de date pour l'exemple avec Python :
data_with_dates = [
    (1, "Widget", 20.0, 5, "2023-10-01"),
    (2, "Gadget", 30.0, 8, "2023-10-01"),
    (3, "Widget", 20.0, 3, "2023-10-02"),
    (4, "Gadget", 30.0, 10, "2023-10-02")
]

columns_with_dates = ["transactionId", "product", "amount", "quantity", "date"]

sales_df_with_dates = spark.createDataFrame(data_with_dates, columns_with_dates)

Exemple d'agrégation

Le code permettant de additionner les ventes totales par date est similaire en Scala et en Python :


Scala
val dailySales = salesDfWithDates
  .groupBy("date")
  .agg(sum("amount").as("total_sales"))

dailySales.show()
Python
daily_sales = sales_df_with_dates.groupBy("date").agg(sum("amount").alias("total_sales"))

daily_sales.show()

Techniques d'optimisation


Pour maximiser les performances lors de l’utilisation de Spark pour les agrégations, tenez compte de ces techniques d’optimisation :


  1. Utiliser le partitionnement : il est bénéfique pour les grands ensembles de données car il minimise la quantité de données impliquées dans les agrégations, accélérant ainsi le processus.


  2. Cache des trames de données intermédiaires : la mise en cache peut améliorer les performances lors de l'exécution de plusieurs opérations sur le même ensemble de données en réduisant les recalculs inutiles.


  3. Tirez parti des jointures de diffusion : lorsqu'un DataFrame est considérablement plus petit, sa diffusion peut empêcher le mélange de grands ensembles de données, améliorant ainsi la vitesse.


Récapitulation des idées


Comprendre et maîtriser les agrégations avec Apache Spark DataFrames et Spark SQL peut considérablement améliorer vos analyses Big Data. Grâce aux connaissances et aux exemples pratiques fournis, vous disposez désormais des outils nécessaires pour effectuer des traitements de données avancés et extraire des informations précieuses de vos jeux de données. Continuez vos expérimentations pour obtenir des informations plus précises et améliorer vos capacités d'analyse !


Vue grand angle d'une configuration de traitement de données moderne avec le logo Spark
Data processing with Spark DataFrames and SQL


+1 508-203-1492

Bedford, MA 01730

bottom of page