So nutzen Sie Python Dask für skalierbare Datenverarbeitung und -analyse
- Claude Paugh
- 25. Apr.
- 7 Min. Lesezeit
Aktualisiert: 3. Mai
In der heutigen datengesteuerten Welt kann die effiziente Verarbeitung und Analyse großer Datensätze für Softwareentwickler und Datenwissenschaftler eine erhebliche Herausforderung darstellen. Herkömmliche Datenverarbeitungsbibliotheken wie Pandas sind zwar intuitiv, können jedoch bei den großen Datenmengen, die viele Organisationen verarbeiten, schwierig zu handhaben sein. Hier wird die Dask-Bibliothek unverzichtbar.
Mit der Python-Bibliothek Dask können Sie mithilfe von Python problemlos komplexe Berechnungen mit Big Data durchführen. Dies ist auch mit kostengünstigeren CPUs als mit GPUs möglich. Daher ist es wichtig, zwischen Datenmanipulation und Vorverarbeitung, die auf der CPU durchgeführt werden können, und algorithmischen Operationen sowie Bild- und Videoverarbeitung zu unterscheiden, die besser für GPUs geeignet sind.
In diesem Blogbeitrag gehen wir auf die Funktionen der Dask-Bibliothek ein, erfahren, wie Sie sie in Ihren Arbeitsablauf integrieren und letztendlich ihre Leistungsfähigkeit zur Optimierung Ihrer Datenverarbeitungsaufgaben nutzen.
Übersicht über die Python DASK-Bibliothek
Dask ist eine Open-Source-Bibliothek für paralleles Rechnen, mit der Benutzer Python-Anwendungen effizient von einzelnen Maschinen auf große Cluster skalieren können. Im Gegensatz zu Pandas, das hauptsächlich im Speicher arbeitet, zeichnet sich Dask durch die Verwaltung paralleler Berechnungen aus und kann Datensätze verarbeiten, die größer sind als der verfügbare Speicher.
Zu den Hauptkomponenten von Dask gehören:
Dask-Arrays – Für große, mehrdimensionale Arrays.
Dask DataFrames – zum Bearbeiten großer Datensätze ähnlich wie Pandas DataFrames.
Dask Bags – zur Verarbeitung unstrukturierter Daten, ähnlich der Liste von Python.
Das wirklich Schöne an Dask ist seine Fähigkeit, sich in vorhandene Python-Bibliotheken zu integrieren und nahtlos mit ihnen zu arbeiten.
Dask installieren
Bevor Sie sich in Dask vertiefen, müssen Sie es in Ihrer Umgebung installieren.
Sie können Dask ganz einfach mit pip installieren:
```bash
pip install dask
```
Wenn Sie Dask mit dem verteilten Scheduler verwenden möchten, schließen Sie Folgendes ein, um die volle Funktionalität zu erhalten:
```bash
pip install dask[distributed]
```
Überprüfen Sie nach der Installation Ihre Installation, indem Sie die Dask-Version in Ihrer Python-Umgebung prüfen:
import dask
print(dask.__version__)
Dask-Arrays: Out-of-Core-Computing
Dask-Arrays sind ein leistungsstarkes Tool für die Arbeit mit umfangreichen numerischen Daten, die möglicherweise nicht in den Speicher passen. Ein Array, das zu groß für den Speicher ist, kann in kleinere Teile aufgeteilt werden, was parallele Berechnungen ermöglicht.
Erstellen von Dask-Arrays
Sie können Dask-Arrays ganz einfach aus NumPy erstellen. Zum Beispiel:
import dask.array as da
import numpy as np
Erstellen Sie ein großes NumPy-Array
numpy_array = np.random.rand(10000, 10000)
Erstellen Sie ein Dask-Array aus einem NumPy-Array
x = da.from_array(numpy_array, chunks=(1000, 1000))
Hier kann „numpy_array“ ein beliebig großes Array sein und „chunks“ steuert, wie das Array aufgeteilt wird.
Grundlegende Operationen
Dask-Arrays ermöglichen NumPy-ähnliche Operationen, Berechnungen werden jedoch erst ausgeführt, wenn sie ausdrücklich angefordert werden. Zum Beispiel:
result = (x + 1).mean()
Um das Ergebnis zu berechnen, verwenden Sie:
final_result = result.compute()
Dieser Ansatz kann zu erheblichen Leistungsverbesserungen führen, insbesondere bei der Arbeit mit großen Datensätzen.
Dask DataFrames: Eine vertraute Schnittstelle für Big Data
Wenn Sie mit Pandas vertraut sind, ist die Verwendung von Dask DataFrames intuitiv. Sie ermöglichen die Verarbeitung großer Datensätze über eine vertraute Schnittstelle, nutzen aber die Vorteile der Parallelverarbeitung.
Erstellen von Dask-Datenrahmen
Sie können einen Dask DataFrame erstellen, indem Sie CSV-Dateien lesen oder einen Pandas DataFrame konvertieren, wie hier gezeigt:
import dask.dataframe as dd
Lesen einer großen CSV-Datei in einen Dask DataFrame
df = dd.read_csv('large_dataset.csv')
Dieser Vorgang teilt die CSV-Datei in mehrere Partitionen auf und ermöglicht so eine parallele Stapelverarbeitung. Dask unterstützt auch das Lesen von JSON, HDF5, Parquet, Orc und Datenbanktabellen mit SQL. Dask DataFrames können in CSV, HDF5, Parquet und SQL schreiben.
Operationen an Dask-Datenrahmen
Gängige DataFrame-Operationen wie Filtern und Zusammenführen können problemlos durchgeführt werden:
filtered_df = df[df['avg_gain'] > 30]
Diese Vorgänge sind verzögert, was bedeutet, dass Dask keine Aufgaben ausführt, bis Sie sie aufrufen.
.compute()
Dask Bags erkunden: Arbeiten mit unstrukturierten Daten
Dask-Bags sind unglaublich nützlich für die Verarbeitung von Sammlungen von Python-Objekten. Dies ist besonders vorteilhaft für die Verarbeitung von JSON-Dateien, Textdaten oder anderen unstrukturierten Datentypen. Die Funktion from_sequence bietet die Möglichkeit, Python-Iterables zu lesen. Es ermöglicht auch das Lesen von Avro-Formaten. Börsen können auch in CSV, JSON und Avro schreiben.
Dask-Taschen herstellen
Sie können eine Dash Bag erstellen, indem Sie eine JSON-Datei lesen:
import dask.bag as db
bag = db.read_text('data/*.json')
Operationen in dunklen Säcken
Dask-Taschen ermöglichen standardmäßige Mapping- und Filtervorgänge. Nachfolgend sehen Sie ein Beispiel für einen Mapping-Vorgang:
mapped_bag = bag.map(lambda x: x['slope'])
Denken Sie daran, ähnlich wie bei Arrays und Datenrahmen „.compute()“ aufzurufen, um die Operationen auszuführen:
final_output = mapped_bag.compute()
Programmieren mit Dask
Dask verwendet je nach Bedarf unterschiedliche Scheduler. Die wichtigsten Optionen sind der Single-Thread-Scheduler und der Multi-Thread- oder verteilte Scheduler.
Lokaler Programmierer
Der lokale Scheduler ist für kleine Arbeitslasten einfach zu verwenden. Es nutzt die Threading-Funktionen von Python, um Aufgaben parallel auszuführen, und ist daher ideal für schnelle Berechnungen mit mittelgroßen Datensätzen.
Verteilter Programmierer
Bei größeren Datensätzen oder intensiven Berechnungen kann die Verwendung des Moduls „dask.distributed“ die Leistung erheblich verbessern. Richten Sie einen Dask-Cluster ein, um Berechnungen über mehrere Worker hinweg zu verwalten. Hier ist eine kurze Einrichtung:
from dask.distributed import Client
client = Client() # This starts a local cluster
# or
client = Client(processes=False)
Sie können den Status Ihres Clusters und der Aufgabenübermittlung überwachen und so eine bessere Kontrolle über die Ressourcen haben. Der Multi-Node-Scheduler erfordert die Installation von Dask auf jedem Knoten sowie eine zusätzliche Konfiguration für den Scheduler und die Worker. Für einen einzelnen Knoten wird der globale Standardplaner mit dask.config.set(scheduler...) festgelegt. Befehl. Dies kann global erfolgen:
dask.config.set(scheduler='threads')
x.compute()
oder als Kontextmanager:
with dask.config.set(scheduler='threads'):
x.compute()
Mit dem poolbasierten Einzelmaschinen-Scheduler können Sie benutzerdefinierte Pools bereitstellen oder die gewünschte Anzahl an Workern angeben:
from concurrent.futures import ThreadPoolExecutor
with dask.config.set(pool=ThreadPoolExecutor(4)):
x.compute()
with dask.config.set(num_workers=4):
x.compute()
Wenn Sie an einer Dask-Cloud-Bereitstellung mit oder ohne Kubernetes interessiert sind, empfehle ich Ihnen, den Abschnitt „Bereitstellen von Dask-Clustern“ in der Dask-Dokumentation zu lesen.
Deckt Szenarien für einzelne Knoten, Cloud, Kubernetes und HPC ab. Am Ende der Dokumentation finden Sie weiterführendes Wissen, das Ihnen hier weiterhilft. Behandelt betriebliche Themen und Referenzen zur Verwendung von Coiled und Saturn Cloud als SaaS-Optionen, anstatt es selbst zu tun.
Wenn Sie Skalierbarkeitsprobleme haben, bei denen der Python-Interpreter das GIL nicht veröffentlicht, ist die Implementierung von Workern höchstwahrscheinlich obligatorisch. Sie können die Anzahl der Threads pro Worker anpassen, es wird jedoch empfohlen, einen Thread pro Prozessorkern (CPU oder vCPU) zu verwenden.
Dies kann auf mehrere oder nur einen Mitarbeiter verteilt werden. Wenn mir beispielsweise von meinem Cloud-Anbieter (offensichtlich nicht Kubernetes) 12 vCPUs zugewiesen wurden, kann ich Folgendes konfigurieren:
dask worker --nworkers 12 --nthreads 1 tcp://192.0.0.100:8786
# or
dask worker --nworkers 1 --nthreads 12 tcp://192.0.0.100:8786
Dies sind offensichtlich Befehlszeilenoptionen, die in einer Shell oder einem Skript ausgeführt werden. Die vollständige Befehlszeilenreferenz finden Sie hier . In diesen Beispielen wird die IPv4-Adresse verwendet; Die Verwendung eines Hostnamens wäre die bevorzugte Konfiguration und Dask unterstützt die Namensauflösung für DNS. Es ist auch erwähnenswert, dass Speicherlimits für Worker festgelegt werden können, wenn diese über die Befehlszeile gestartet werden.
Starten Sie den Scheduler von der Kommandozeile aus wie folgt:
$ dask scheduler
Scheduler at: tcp://192.0.0.100:8786
Sie sollten beachten, dass der Dask-Worker beim Start auf den zu verwendenden Scheduler verweist. Ein Nachteil von Dask, den Sie jedoch beheben können, ist, dass es bei einem einzelnen Programmierer einen Single Point of Failure gibt. Wenn Sie einen Cloud-Anbieter verwenden, bietet dieser Optionen zum Erstellen mehrerer Knoten, die auf eine einzelne Adressanforderung antworten. Daher kann die Untersuchung dieser Optionen den einzelnen Fehlerpunkt eindämmen.
Nach dem Starten des Schedulers und der Worker umfassen beide diagnostische Webserver und die URL stimmt mit der bereitgestellten IP oder dem bereitgestellten Host überein, wobei Port 8787 verwendet wird, sodass sie überwacht werden können. Sie sehen Aufgabendiagramme, die Arbeitsspeichernutzung des Workers und die Thread-Aktivität. Weitere Informationen zu den verfügbaren HTTP-Endpunkten finden Sie hier .
Leistungsoptimierung mit Dask
Um das Potenzial von Dask voll auszuschöpfen, ist die Leistungsoptimierung entscheidend. Hier sind zwei Strategien, die hervorragende Ergebnisse erzielen können:
Datenpartitionierung
Die ordnungsgemäße Aufteilung der Daten in überschaubare Blöcke verbessert die Leistung. Beispielsweise können in Dask DataFrames die Berechnungen beschleunigt werden, wenn die Anzahl der Partitionen explizit angegeben wird.
Jede Partition ist im Wesentlichen ein gekapselter Pandas-Datenrahmen, da die Funktionen des Dask-Datenrahmens eine skalierte Verwendung von Pandas ermöglichen. Wenn Sie beispielsweise einen Datensatz in einem Dask-Datenrahmen mit 1 Milliarde Zeilen haben und 100 Partitionen wünschen, erstellt Dask auf versteckte Weise 10 Millionen Pandas-Datenrahmen.
Dask kümmert sich um den Datenzugriff auf hoher Ebene, aber jeder zugrunde liegende Datenrahmen ist eine Pandas-Implementierung. Sie können das Pandas-Argumentschlüsselwort verwenden, es gibt jedoch Einschränkungen, die von der von Dask verwendeten Pandas-Version abhängen.
So vermeiden Sie zu viele kleine Aufgaben
Das Erstellen zu kleiner Aufgaben kann zu Ineffizienzen führen. Jede Aufgabe bringt einen Mehraufwand mit sich, und wenn die Aufgaben zu granular sind, können die Vorteile der parallelen Verarbeitung zunichte gemacht werden. Eine kleinere Anzahl größerer Aufgaben ist für Dask im großen Maßstab besser geeignet.
Häufige Anwendungsfälle für Dask
Dask ist in verschiedenen Szenarien zur Datenverarbeitung und -analyse anwendbar:
Groß angelegte Datenanalyse
Dask DataFrames eignen sich ideal für die Analyse großer Datensätze, die die Speichergrenzen überschreiten, und funktionieren für Standardanalysen ähnlich wie Pandas. Überprüfen Sie die von Dask verwendete Pandas-Version, um herauszufinden, welche Funktionen unterstützt werden.
Maschinelles Lernen
Die Dask-Integration mit beliebten Bibliotheken wie Scikit-learn ist unkompliziert. Beispielsweise können Sie Dask-ML verwenden, um Machine-Learning-Aufgaben auf großen Datensätzen effizient zu skalieren und so die Verarbeitungszeiten erheblich zu verbessern. Dask bietet eine verteilte parallele Verarbeitung von Python-Funktionen. Wenn Sie also Funktionen haben, können Sie diese mithilfe eines Dekorators parallelisieren. Allerdings hat die Verschachtelung auch ihre Grenzen.
Datenaufnahme und -transformation
Dask vereinfacht das Lesen und Transformieren großer Datensätze, was für die Vorverarbeitungsphasen vor der Analyse von entscheidender Bedeutung ist, und ermöglicht Ihnen die problemlose Handhabung mehrerer Formate.
Nutzen Sie Dask für Ihre Daten-Workflows
Die Nutzung von Dask für die skalierbare Datenverarbeitung und -analyse bietet erhebliche Vorteile, insbesondere bei der Bewältigung groß angelegter Datenoperationen. Wenn Sie die wichtigsten Funktionen verstehen, können Sie Dask effektiv in Ihren Python-Data-Science-Workflow integrieren.
Dank der parallelen Verarbeitungsfunktionen von Dask kann es Vorgänge bewältigen, die herkömmliche Tools belasten. Ob Sie mit großen Arrays, DataFrames oder unstrukturierten Daten arbeiten, Dask eröffnet neue Wege zur effizienten Datenmanipulation.
Sie sind jetzt gut gerüstet, um die Leistung von Dask zu nutzen und Ihre Datenverarbeitungsaufgaben effizient und skalierbar zu gestalten.
Wenn Sie die skalierbare Datenverarbeitung weiter erkunden, sollten Sie mit Dask experimentieren, um zu sehen, wie es Ihren Arbeitsablauf optimieren kann. Viel Spaß beim Programmieren!