Apache Iceberg und Pandas Analytics: Teil III
- Claude Paugh
- 11. Mai
- 5 Min. Lesezeit
Aktualisiert: 24. Juni
Die beiden vorherigen Artikel befassten sich mit der Bewertung von Apache Iceberg und seinen Funktionen sowie der Verwendung von PyIceberg zum Erstellen von Objekten und Laden von Daten. Dieser Beitrag konzentriert sich auf das Abrufen von Daten und die Verwendung von Pandas-Datenrahmen zur Erstellung von Analysen.

Python Pandas und Apache Iceberg
Was Python-Pakete angeht, ist Pandas seit seiner Veröffentlichung eines der beliebtesten. Seine Stärke liegt in der Bereitstellung mehrdimensionaler Arrays zur Datendarstellung, auch bekannt als Tabellen, und es unterstützt viele einfache bis fortgeschrittene Berechnungen/Aggregationen, die häufig von Absolventen oder Nicht-Leuten aus der Tabellenkalkulation genutzt werden. Mit der Weiterentwicklung der Data Science hat auch die Popularität von Pandas zugenommen.
Pandas nutzt das Python-Paket Numpy für Arrays, Typen und mathematische Funktionen. Sie können die DataFrames mithilfe verschiedener Techniken filtern, segmentieren, unterteilen und erweitern. Bei richtiger Planung können Sie mehrere Pandas DataFrames effektiv mehrdimensionale Würfel nachbilden lassen. Nicht unbedingt zusammenhängend, aber funktional.

Pandas verfügt über verschiedene „ read_ “-Methoden in der Klasse, die es ermöglichen, aus einer Eingabe einen Datenrahmen zu erstellen. Beispiele hierfür sind CSV, JSON, Parquet, durch Trennzeichen getrennte Dateien, SQL-Anweisungen/Ergebnismengen, Apache AVRO & ORC, HDF5, Puffer, Pickle-Dateien, HTML sowie Listen und Wörterbücher in Python, um nur einige Quellen zu nennen, aus denen Pandas Daten lesen und verarbeiten kann.
Pandas benannte Aggregate
Nachdem ich in meinem vorherigen Beitrag die Iceberg-Tabellen definiert und geladen hatte, erschien mir eine Analyseübung mit Pandas als guter Test für einen realen Anwendungsfall. Ich begann mit den Grundlagen des Datenabrufs und -abrufs:
from get_catalog import get_catalog
import pandas as pd
from pyiceberg.expressions import GreaterThan, GreaterThanOrEqual, EqualTo, NotEqualTo
import matplotlib.pyplot as plt
import seaborn as sns
# Pandas output display formatting
pd.options.display.max_rows = 50
pd.options.display.max_columns = 50
pd.options.display.width = 1000
pd.options.display.colheader_justify = 'center'
pd.options.display.precision = 4
pd.options.display.float_format = '{:,.2f}'.format
# Get catalog
catalog = get_catalog()
# List tables in docs namespace
catalog.list_tables("docs")
# Load tables from Iceberg docs namespace
company = catalog.load_table("docs.company")
ratios_ttm = catalog.load_table("docs.ratios_ttm")
forecasts = catalog.load_table("docs.forecasts")
# Apply filters, convert dates, and convert to pandas
company_tech = company.scan(row_filter=(EqualTo('Sector', 'Technology')))
company_tech_usd = company_tech.filter(EqualTo('Currency','USD'))
company_tech_use_df = company_tech_usd.to_pandas()
company_df = company.scan().to_pandas()
Die Kommentarzeile " Filter anwenden... " oben bietet einen Hinweis für PyIceberg. Sie können anhand des Imports von pyiceberg.expressions , dass einige Ausdruckstypen importiert wurden. Es handelt sich im Wesentlichen um vier Ausdrücke, die sowohl für die numerische als auch für die String-Auswertung verwendet werden. Der erste Zeilenfilter zielt ausschließlich auf den Technologiesektor ab, indem er den Ausdruck „EqualTo“ aus dem Import verwendet. Der zweite Filter beschränkt die Zeilen auf die Währung USD. Die Syntax unterscheidet sich geringfügig, da unterschiedliche Objekte verwendet werden. Das erste Objekt ist eine Tabelle, die gescannt und gefiltert wird, während das zweite ein DataScan-Objekt ist, das gefiltert wird.

Der nächste Abschnitt führt eine Typkonvertierung für Pandas durch, und Dataframes werden zusammengeführt, um neue Objekte zu erstellen. Ich habe einen Pandas-Filter für das Objekt company_df verwendet, um einen Kontrast zum obigen Filter mit den Methoden pyiceberg.expressions zu schaffen. Jede Spalte im Dataframe ist als Objekt typisiert. Um numerische Berechnungen durchführen zu können, muss ich die Spalten daher in einen numerischen Typ (Float) konvertieren. Außerdem führe ich einen weiteren Scan der Iceberg-Tabelle „Forecasts“ durch, um nach „Dividend_Per_Share_Growth“ zu filtern und die Ausgabe anschließend in Pandas zu konvertieren.
# Pandas filters; set date datatypes
company_df = company_df[company_df['Currency'] == 'USD']
company_df['AsOfDate'] = pd.to_datetime(company_df['AsOfDate'], format='%Y-%m-%d').dt.date
ratios_ttm_df = ratios_ttm.scan(row_filter=GreaterThan('Cash_Per_Share', 0.01)).to_pandas()
ratios_ttm_df['AsOfDate'] = pd.to_datetime(ratios_ttm_df['AsOfDate'], format='%Y-%m-%d').dt.date
forecasts_df = forecasts.scan(row_filter=GreaterThanOrEqual('Dividend_Per_Share_Growth', 0.000)).to_pandas()
forecasts_df['AsOfDate'] = pd.to_datetime(forecasts_df['AsOfDate'], format='%Y-%m-%d').dt.date
# Merge tables and clean up duplicates
company_ratios_df = pd.merge(company_df, ratios_ttm_df, how='inner', on='Ticker').drop_duplicates(subset=['Ticker']).reset_index(drop=True)
company_forecast_df = pd.merge(company_df, forecasts_df, how='inner', on='Ticker').drop_duplicates(subset=['Ticker']).reset_index(drop=True)
company_ratios_forecast_df = pd.merge(company_ratios_df, forecasts_df, how='inner', on='Ticker').drop_duplicates(subset=['Ticker']).reset_index(drop=True)
Der letzte Abschnitt des Codes führt eine Datenrahmenzusammenführung mithilfe der Spalte „Ticker“ in den Datenrahmen durch und löscht Duplikate, indem dieselbe Spalte als Teilmenge verwendet wird. Ich setze den Index zurück, sonst funktionieren meine nächsten Operationen nicht. Für SQL-Benutzer ähnelt die Zusammenführung einer gemeinsamen Spalte mit „how='inner'“ funktional einem Inner Join zwischen zwei Tabellen in einem RDBMS. Die Ergebnisse dieser Operationen liefern einen erweiterten Datensatz zur weiteren Verwendung.
Der nächste Abschnitt befasst sich lediglich mit der Spaltentypisierung für Float-Objekte. Daher überspringe ich diesen Abschnitt und gehe direkt zu den Pandas-„Benannten Aggregaten“ über. Wie Sie unten sehen, werden mit dem Pandas-Alias „pd“ aus dem Import benannte Aggregate erstellt.
# company + ratios + forecast named aggregates
company_cash_agg = pd.NamedAgg(column='Cash_Per_Share', aggfunc='mean')
company_tangible_agg = pd.NamedAgg(column='Tangible_Book_Value_Per_Share', aggfunc='median')
company_free_cash_agg = pd.NamedAgg(column='Free_Cash_Flow_Per_Share', aggfunc='mean')
company_discounted_agg = pd.NamedAgg(column='Discounted_Cash_Flow', aggfunc='mean')
company_yield_agg = pd.NamedAgg(column='Free_Cash_Flow_Yield_PCT', aggfunc='mean')
company_pe_agg = pd.NamedAgg(column='Price_To_Earnings', aggfunc='median')
company_roe_agg = pd.NamedAgg(column='Return_On_Equity_PCT', aggfunc='median')
Die Ergebnisse für jedes Aggregat werden berechnet, indem ein Dataframe mit den Spalten verwendet und eine Gruppierung hinzugefügt wird. Die benannten Ergebnisse, z. B. „mean_10yr_cash“ usw., sind die Spaltennamen für den resultierenden Aggregat-Dataframe. Sie werden unten feststellen, dass es möglich ist, mehrere benannte Aggregat-Ergebnisse aus dem einzelnen Dataframe „Gruppierung“ zu erhalten; im Grunde eine Anweisung.
# Get results for each aggregate
company_cash_all = company_ratios_forecast_df.groupby(['Ticker', 'CompanyName']).agg(mean_10yr_cash=company_cash_agg, mean_10yr_free_cash=company_free_cash_agg,
mean_10yr_discounted_cash=company_discounted_agg,
mean_10yr_cash_yield=company_yield_agg)
company_metrics = company_ratios_forecast_df.groupby(['Ticker', 'CompanyName']).agg(median_tangible_bv_share=company_tangible_agg, median_pe=company_pe_agg, median_roe_pct=company_roe_agg)
# Non named aggregates
df_div = forecasts_df.groupby('CalendarYear')['Dividend_Per_Share_Growth'].agg(['mean', 'median', 'quantile'])
df_sga = forecasts_df.groupby('CalendarYear')['Debt_Growth_PCT'].agg(['mean', 'median', 'quantile'])
Ergebnisse:




Pandas-Ergebnisse grafisch darstellen
Ich habe beschlossen, die aggregierte Ausgabe des Beispiels besser zu veranschaulichen, indem ich die Ergebnisse mit matplotlib und seaborn grafisch darstelle. Manchmal liefert eine visuelle Darstellung Kontextinformationen, die nur implizit bei der Betrachtung der Ausgabeberechnungen ersichtlich sind.

Der folgende Codeblock enthält Kommentare, die Ihnen den Kontext für die Darstellung der resultierenden Datensätze erläutern. Mit Matplotlib-Kenntnissen ist dies relativ einfach. Andernfalls müssen Sie möglicherweise die Syntax nachschlagen. Wenn Sie wissen, wie man Diagramme erstellt, ist dies relativ intuitiv, da die Bibliotheken die in der Diagrammerstellung verwendete Terminologie verwenden.
# 1. Dividend Growth and Debt Growth Over Time
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
# Dividend Growth Plot
df_div.plot(ax=ax1, marker='o')
ax1.set_title('Dividend Per Share Growth Over Time')
ax1.set_xlabel('Calendar Year')
ax1.set_ylabel('Growth Rate')
ax1.legend(['Mean', 'Median', 'Quantile'])
ax1.grid(True)
# Debt Growth Plot
df_sga.plot(ax=ax2, marker='o')
ax2.set_title('Debt Growth Percentage Over Time')
ax2.set_xlabel('Calendar Year')
ax2.set_ylabel('Growth Rate (%)')
ax2.legend(['Mean', 'Median', 'Quantile'])
ax2.grid(True)
plt.tight_layout()
plt.show()
# 2. Company Metrics Distribution
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
# Cash Metrics
sns.boxplot(data=company_cash_all, ax=ax1)
ax1.set_title('Distribution of Cash Metrics')
ax1.tick_params(axis='x', rotation=45)
# Company Performance Metrics
sns.boxplot(data=company_metrics, ax=ax2)
ax2.set_title('Distribution of Company Performance Metrics')
ax2.tick_params(axis='x', rotation=45)
# Top 10 Companies by Free Cash Flow Yield
top_cash_yield = company_cash_all.nlargest(10, 'mean_10yr_cash_yield')
sns.barplot(data=top_cash_yield.reset_index(), x='CompanyName', y='mean_10yr_cash_yield', ax=ax3)
ax3.set_title('Top 10 Companies by Free Cash Flow Yield')
ax3.tick_params(axis='x', rotation=45)
# Top 10 Companies by ROE
top_roe = company_metrics.nlargest(10, 'median_roe_pct')
sns.barplot(data=top_roe.reset_index(), x='CompanyName', y='median_roe_pct', ax=ax4)
ax4.set_title('Top 10 Companies by Return on Equity')
ax4.tick_params(axis='x', rotation=45)
plt.tight_layout()
plt.show()
# 3. Correlation Matrix of Key Metrics; Combine relevant metrics
correlation_metrics = pd.merge(
company_cash_all.reset_index(),
company_metrics.reset_index(),
on=['Ticker', 'CompanyName']
)
correlation_cols = [col for col in correlation_metrics.columns
if col not in ['Ticker', 'CompanyName']]
correlation_matrix = correlation_metrics[correlation_cols].corr()
plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0)
plt.title('Correlation Matrix of Company Metrics')
plt.tight_layout()
plt.show()



Ich hoffe, ich konnte Ihnen einen Einblick in einen möglichen Anwendungsfall für die Analyse mit Pandas und Apache Iceberg geben. Es gibt weitere Varianten, die ich in naher Zukunft wahrscheinlich untersuchen werde. Beginnen wir mit Apache Spark und Iceberg. Dabei werden Quelldaten aus Iceberg gelesen, Aggregate und Berechnungen in Spark durchgeführt und die resultierende Ausgabe zurück in Iceberg geschrieben. Vielen Dank fürs Lesen.
