Comment exploiter Python Dask pour un traitement et une analyse de données évolutifs
- Claude Paugh
- 24 juil.
- 8 min de lecture
Dans un monde axé sur les données, traiter et analyser efficacement de grands volumes de données peut représenter un défi majeur pour les ingénieurs logiciels et les data scientists. Les bibliothèques de traitement de données traditionnelles comme Pandas, bien que conviviales, peuvent avoir des difficultés à gérer les vastes volumes de données auxquels de nombreuses organisations sont confrontées. C'est là que la bibliothèque Dask devient essentielle.
Grâce à la bibliothèque Python Dask, vous pouvez facilement effectuer des calculs complexes sur le Big Data avec Python. Ces calculs sont également réalisables sur des processeurs moins coûteux que sur des GPU. Il est donc important de distinguer le traitement et le prétraitement des données réalisables sur les processeurs, des opérations algorithmiques et du traitement d'images/vidéos plus adaptés aux GPU.
Dans cet article de blog, nous allons plonger dans les capacités de la bibliothèque Dask, apprendre à l'intégrer dans votre flux de travail et, en fin de compte, exploiter sa puissance pour optimiser vos tâches de traitement de données.
Présentation de la bibliothèque Python Dask
Dask est une bibliothèque de calcul parallèle open source qui permet aux utilisateurs de faire évoluer efficacement des applications Python, d'une machine unique à de grands clusters. Contrairement à Pandas, qui fonctionne principalement en mémoire, Dask excelle dans la gestion des calculs parallèles et peut gérer des ensembles de données plus volumineux que la mémoire disponible.
Les principaux composants de Dask comprennent :
Dask Arrays : pour les grands tableaux multidimensionnels.
Dask DataFrames : pour manipuler de grands ensembles de données similaires aux Pandas DataFrames.
Dask Bags : pour le traitement de données non structurées, semblables à la liste de Python.
La véritable beauté de Dask réside dans sa capacité à s’intégrer aux bibliothèques Python existantes et à fonctionner de manière transparente avec elles.
Installation de Dask
Avant de plonger dans Dask, vous devez l'installer dans votre environnement.
Vous pouvez installer Dask facilement en utilisant pip :
```bash
pip install dask
```
Si vous prévoyez d'utiliser Dask avec le planificateur distribué, incluez les éléments suivants pour bénéficier de toutes les fonctionnalités :
```bash
pip install dask[distributed]
```
Une fois installé, vérifiez votre installation en vérifiant la version de Dask dans votre environnement Python :
import dask
print(dask.__version__)
Dask Arrays : calcul hors cœur
Les tableaux Dask sont un outil puissant pour travailler avec des données numériques volumineuses qui peuvent ne pas tenir en mémoire. Un tableau trop volumineux peut être divisé en blocs plus petits, ce qui permet d'effectuer des calculs en parallèle.
Création de tableaux Dask
Vous pouvez facilement créer des tableaux Dask avec NumPy. Par exemple :
import dask.array as da
import numpy as np
Créer un grand tableau NumPy
numpy_array = np.random.rand(10000, 10000)
Créer un tableau Dask à partir du tableau NumPy
x = da.from_array(numpy_array, chunks=(1000, 1000))
Ici, `numpy_array` peut être n'importe quel grand tableau, et `chunks` contrôle la façon dont le tableau est divisé.
Opérations de base
Les tableaux Dask permettent des opérations similaires à celles de NumPy, mais les calculs ne s'exécutent que si vous les demandez explicitement. Par exemple :
result = (x + 1).mean()
Pour calculer le résultat, vous utiliseriez :
final_result = result.compute()
Cette approche peut conduire à des améliorations significatives des performances, en particulier lorsque vous travaillez avec de grands ensembles de données.
Dask DataFrames : une interface familière pour le Big Data
Si vous connaissez Pandas, vous trouverez les DataFrames Dask intuitifs. Ils permettent de traiter de grands ensembles de données via une interface familière, tout en exploitant les avantages du calcul parallèle.
Création de DataFrames Dask
Vous pouvez créer un DataFrame Dask en lisant des fichiers CSV ou en convertissant un DataFrame Pandas, comme indiqué ici :
import dask.dataframe as dd
Lire un fichier CSV volumineux dans un DataFrame Dask
df = dd.read_csv('large_dataset.csv')
Cette opération divise le fichier CSV en plusieurs partitions, permettant ainsi un traitement par lots en parallèle. Dask prend également en charge la lecture depuis des tables JSON, HDF5, Parquet, Orc et des bases de données utilisant SQL. Les DataFrames Dask peuvent écrire vers des fichiers CSV, HDF5, Parquet et SQL.
Opérations sur les DataFrames Dask
Les opérations DataFrame courantes telles que le filtrage et la fusion peuvent être effectuées facilement :
filtered_df = df[df['avg_gain'] > 30]
Ces opérations sont paresseuses, ce qui signifie que Dask n'exécute pas les tâches jusqu'à ce que vous appeliez
.compute()
Explorer les Dask Bags : travailler avec des données non structurées
Les Dask Bags sont extrêmement utiles pour traiter des collections d'objets Python. Ceci est particulièrement avantageux pour la gestion de fichiers JSON, de données texte ou de tout type non structuré. La fonction from_sequence permet de lire les itérables Python. La lecture depuis les formats Avro est également possible. Les Dask Bags peuvent également écrire aux formats CSV, JSON et Avro.
Création de sacs de soirée
Vous pouvez créer un Dask Bag en lisant un fichier JSON :
import dask.bag as db
bag = db.read_text('data/*.json')
Opérations sur les sacs Dask
Les Dask Bags permettent des opérations de mappage et de filtrage standard. Voici un exemple d'opération de mappage :
mapped_bag = bag.map(lambda x: x['slope'])
Similaire aux tableaux et aux DataFrames, n'oubliez pas d'appeler `.compute()` pour exécuter les opérations :
final_output = mapped_bag.compute()
Planification avec Dask
Dask utilise différents ordonnanceurs selon vos besoins. Les principales options sont l'ordonnanceur monothread et l'ordonnanceur multithread ou distribué.
Planificateur local
Le planificateur local est facile à utiliser pour les petites charges de travail. Il exploite les capacités de threading de Python pour exécuter des tâches en parallèle, ce qui le rend idéal pour les calculs rapides sur des ensembles de données de taille moyenne.
Planificateur distribué
Pour les jeux de données volumineux ou les calculs intensifs, l'utilisation du module « dask.distributed » peut améliorer considérablement les performances. Configurez un cluster Dask pour gérer les calculs sur plusieurs nœuds de calcul. Voici une configuration rapide :
from dask.distributed import Client
client = Client() # This starts a local cluster
# or
client = Client(processes=False)
Vous pouvez surveiller l'état de votre cluster et les soumissions de tâches, ce qui vous permet de mieux contrôler vos ressources. Le planificateur multi-nœuds nécessite l'installation de Dask sur chaque nœud, ainsi qu'une configuration supplémentaire pour le planificateur et les workers.
Pour un nœud unique, le planificateur global par défaut en utilisant dask.config.set(scheduler...) commande. Ceci peut être fait globalement :
dask.config.set(scheduler='threads')
x.compute()
ou en tant que gestionnaire de contexte :
with dask.config.set(scheduler='threads'):
x.compute()
Le planificateur mono-machine basé sur un pool vous permet de fournir des pools personnalisés ou de spécifier le nombre de travailleurs souhaité :
from concurrent.futures import ThreadPoolExecutor
with dask.config.set(pool=ThreadPoolExecutor(4)):
x.compute()
with dask.config.set(num_workers=4):
x.compute()
Si vous êtes intéressé par un déploiement cloud de Dask avec ou sans Kubernetes, je vous suggère de lire Déploiement de clusters Dask dans la documentation Dask.
Il couvre des scénarios pour un nœud unique, le cloud, Kubernetes et le HPC. Des connaissances avancées sont également disponibles à la fin de la documentation. Il aborde des sujets opérationnels et des références concernant l'utilisation de Coiled et de Saturn Cloud comme options SaaS plutôt que de les utiliser soi-même.
Si vous rencontrez des problèmes d'évolutivité lorsque l'interpréteur Python ne publie pas le GIL, le déploiement de workers est probablement indispensable. Vous pouvez personnaliser le nombre de threads par worker, mais la règle est d'utiliser un thread par cœur de processeur (CPU ou vCPU) souhaité.
Cela peut être réparti sur plusieurs workers ou sur un seul. Par exemple, si mon fournisseur cloud (pas Kubernetes, bien sûr) m'attribue 12 vCPU, je peux configurer les éléments suivants :
dask worker --nworkers 12 --nthreads 1 tcp://192.0.0.100:8786
# or
dask worker --nworkers 1 --nthreads 12 tcp://192.0.0.100:8786
Il s'agit évidemment d'options de ligne de commande exécutées dans un shell ou un script. La référence complète de la ligne de commande est disponible ici . Dans ces exemples, l'adresse IPv4 est utilisée ; l'utilisation d'un nom d'hôte est préférable, et Dask prend en charge la résolution de noms pour le DNS. Il est également important de noter que vous pouvez imposer des limites de mémoire aux workers lors du démarrage en ligne de commande.
Démarrer le planificateur à partir de la ligne de commande comme ceci :
$ dask scheduler
Scheduler at: tcp://192.0.0.100:8786
Vous remarquerez que le worker Dask pointe vers le planificateur à utiliser au démarrage. Un inconvénient de Dask, que vous pouvez contourner, est le risque de point de défaillance unique avec un seul planificateur. Si vous utilisez un fournisseur cloud, il propose des options permettant de créer plusieurs nœuds répondant à une même requête d'adresse. L'étude de ces options peut donc atténuer le risque de SPOF.
Après le démarrage du planificateur et des workers, ceux-ci sont tous deux dotés de serveurs web de diagnostic. Leur URL est identique à l'adresse IP ou à l'hôte déployé, utilisant le port 8787. Vous pouvez ainsi surveiller les planificateurs et les workers. Vous verrez les graphiques des tâches, la consommation de mémoire des workers et l'activité des threads. Pour en savoir plus sur les points de terminaison HTTP, cliquez ici .
Optimiser les performances avec Dask
Pour maximiser le potentiel de Dask, l'optimisation des performances est essentielle. Voici deux stratégies qui peuvent donner d'excellents résultats :
Partitionnement des données
Un partitionnement approprié de vos données en blocs gérables améliore les performances. Par exemple, dans les DataFrames Dask, spécifier explicitement le nombre de partitions peut accélérer les calculs.
Chaque partition est essentiellement un dataframe Pandas encapsulé, car les fonctionnalités de Dask Dataframe fournissent essentiellement Pandas à grande échelle. Par exemple, si vous disposez d'un jeu de données dans un dataframe Dask d'un milliard de lignes et souhaitez 100 partitions, Dask crée 10 millions de dataframes Pandas en arrière-plan.
Dask gère l'accès aux données de haut niveau, mais chaque dataframe sous-jacent est une implémentation Pandas. Vous pouvez utiliser le mot-clé args de Pandas, mais des limites sont imposées selon la version de Pandas utilisée par Dask.
Éviter trop de petites tâches
Créer des tâches trop petites peut entraîner des inefficacités. Chaque tâche engendre des frais généraux, et une granularité excessive peut annuler les avantages du traitement parallèle. Un nombre réduit de tâches plus importantes est plus adapté à Dask à grande échelle.
Cas d'utilisation courants de Dask
Dask est applicable dans divers scénarios de traitement et d'analyse de données :
Analyse de données à grande échelle
Les DataFrames Dask sont idéaux pour analyser de grands ensembles de données dépassant les limites de mémoire, fonctionnant de manière similaire à Pandas pour les analyses standard. Vérifiez la version de Pandas utilisée par Dask pour connaître les fonctionnalités prises en charge.
Apprentissage automatique
L'intégration de Dask avec des bibliothèques populaires comme Scikit-learn est transparente. Par exemple, vous pouvez utiliser Dask-ML pour adapter efficacement les tâches de machine learning à de grands ensembles de données, améliorant ainsi considérablement les temps de traitement. Dask offre un traitement parallèle distribué des fonctions Python ; si vous avez des fonctions, elles peuvent donc être parallélisées en utilisant simplement un décorateur. L'imbrication est toutefois limitée.
Ingestion et transformation des données
Dask simplifie le processus de lecture et de transformation de grands ensembles de données, ce qui est crucial pour les étapes de prétraitement avant l'analyse, vous permettant de gérer facilement plusieurs formats.
Exploiter Dask pour vos flux de données
L'utilisation de Dask pour le traitement et l'analyse évolutifs des données offre d'excellents avantages, notamment pour les opérations de données à grande échelle. En comprenant ses fonctionnalités clés, vous pouvez intégrer efficacement Dask à votre workflow de science des données Python.
Les capacités de traitement parallèle de Dask vous permettent de gérer des opérations qui surchargent les outils traditionnels. Qu'il s'agisse de grands tableaux, de DataFrames ou de données non structurées, Dask ouvre de nouvelles perspectives pour une manipulation efficace des données.
Vous êtes désormais bien équipé pour utiliser la puissance de Dask, rendant vos tâches de traitement de données à la fois efficaces et évolutives.
À mesure que vous explorez le traitement évolutif des données, pensez à expérimenter avec Dask pour voir comment il peut améliorer votre flux de travail. Bon codage !