top of page

Apache Iceberg et Pandas Analytics : Partie 3

Dernière mise à jour : 22 juin

Les deux articles précédents portaient sur l'évaluation d'Apache Iceberg et de ses fonctionnalités, ainsi que sur l'utilisation de PyIceberg pour créer des objets et charger des données. Cet article se concentrera sur l'extraction de données et l'utilisation des dataframes Pandas pour créer des analyses.
Pandas Analytics
Pandas Analytics

Python Pandas et Apache Iceberg

Parmi les packages Python, Pandas a été l'un des plus populaires depuis sa sortie initiale. Son point fort était de fournir des tableaux multidimensionnels pour représenter les données, appelés tables, et de prendre en charge de nombreux calculs et agrégations, de base à avancés, couramment utilisés par ceux qui ont abandonné les tableurs ou qui n'en avaient tout simplement pas envie. Avec le développement de la science des données au fil des ans, la popularité de Pandas a également augmenté.


Pandas utilise largement le package Python numpy pour les tableaux, les types et les fonctions mathématiques. Vous pouvez également filtrer, découper, sous-ensembleer et étendre les dataframes grâce à diverses techniques. Avec une planification adéquate, vous pouvez efficacement créer plusieurs dataframes Pandas imitant des cubes multidimensionnels. Pas nécessairement de manière cohérente, mais fonctionnellement.

pandas lisant des fichiers
Pandas reading files

Pandas dispose de plusieurs méthodes « read_ » dans sa classe permettant de récupérer des données et de créer un dataframe à partir de ces données. Par exemple, les formats CSV, JSON, Parquet, les fichiers délimités, les instructions/ensembles de résultats SQL, Apache AVRO et ORC, HDF5, les tampons, les fichiers Pickle, le HTML et les listes ou dictionnaires Python sont autant de sources que Pandas peut lire et analyser.


Les pandas nommés agrégats

Après avoir défini et chargé les tables Iceberg, dans mon article précédent , réaliser un exercice d'analyse avec Pandas semblait être un bon test pour un cas d'utilisation réel. J'ai commencé par les bases de l'obtention et de la récupération des données :

from get_catalog import get_catalog
import pandas as pd
from pyiceberg.expressions import GreaterThan, GreaterThanOrEqual, EqualTo, NotEqualTo
import matplotlib.pyplot as plt
import seaborn as sns

# Pandas output display formatting
pd.options.display.max_rows = 50
pd.options.display.max_columns = 50
pd.options.display.width = 1000
pd.options.display.colheader_justify = 'center'
pd.options.display.precision = 4
pd.options.display.float_format = '{:,.2f}'.format

# Get catalog
catalog = get_catalog()

# List tables in docs namespace
catalog.list_tables("docs")

# Load tables from Iceberg docs namespace
company = catalog.load_table("docs.company")
ratios_ttm = catalog.load_table("docs.ratios_ttm")
forecasts = catalog.load_table("docs.forecasts")

# Apply filters, convert dates, and convert to pandas
company_tech = company.scan(row_filter=(EqualTo('Sector', 'Technology')))
company_tech_usd = company_tech.filter(EqualTo('Currency','USD'))
company_tech_use_df = company_tech_usd.to_pandas()
company_df = company.scan().to_pandas()

La ligne de commentaire « Appliquer les filtres… » ci-dessus est un élément à souligner concernant PyIceberg. On peut le constater à partir de l'importation de pyiceberg.expressions indique que plusieurs types d'expressions ont été importés. Il s'agit essentiellement de quatre expressions utilisées pour l'évaluation numérique et de chaînes. Le premier filtre row_filter cible uniquement le secteur technologique en utilisant l'expression « EqualTo » de l'importation. Le second filtre limite les lignes à la devise USD. La syntaxe est légèrement différente car les objets utilisés sont différents. Le premier objet est une table analysée et filtrée, tandis que le second est un objet DataScan filtré.

pandas calculant les totaux
Pandas calculating totals

La section suivante effectue la conversion de type pour Pandas, et les dataframes sont fusionnés pour créer de nouveaux objets. J'ai utilisé un filtre Pandas sur l'objet company_df , par contraste avec le filtre précédent utilisant les méthodes pyiceberg.expressions . Chaque colonne du dataframe est typée comme un objet ; pour effectuer des calculs numériques, je dois donc convertir les colonnes en type numérique (float). J'effectue également une autre analyse de la table « prévisions » d'Iceberg afin de filtrer « Dividend_Per_Share_Growth » , puis de convertir le résultat en Pandas.

# Pandas filters; set date datatypes
company_df = company_df[company_df['Currency'] == 'USD']
company_df['AsOfDate'] = pd.to_datetime(company_df['AsOfDate'], format='%Y-%m-%d').dt.date

ratios_ttm_df = ratios_ttm.scan(row_filter=GreaterThan('Cash_Per_Share', 0.01)).to_pandas()
ratios_ttm_df['AsOfDate'] = pd.to_datetime(ratios_ttm_df['AsOfDate'], format='%Y-%m-%d').dt.date

forecasts_df = forecasts.scan(row_filter=GreaterThanOrEqual('Dividend_Per_Share_Growth', 0.000)).to_pandas()
forecasts_df['AsOfDate'] = pd.to_datetime(forecasts_df['AsOfDate'], format='%Y-%m-%d').dt.date

# Merge tables and clean up duplicates
company_ratios_df = pd.merge(company_df, ratios_ttm_df, how='inner', on='Ticker').drop_duplicates(subset=['Ticker']).reset_index(drop=True)
company_forecast_df = pd.merge(company_df, forecasts_df, how='inner', on='Ticker').drop_duplicates(subset=['Ticker']).reset_index(drop=True)
company_ratios_forecast_df = pd.merge(company_ratios_df, forecasts_df, how='inner', on='Ticker').drop_duplicates(subset=['Ticker']).reset_index(drop=True)

La dernière section du code fusionne les dataframes à l'aide de la colonne « Ticker » et supprime les doublons en utilisant la même colonne comme sous-ensemble. Je réinitialise l'index, sinon mes opérations suivantes échouent. Pour les utilisateurs SQL, la fusion sur une colonne commune à l'aide de « how='inner' » est fonctionnellement similaire à une jointure interne entre deux tables dans un SGBDR. Le résultat de ces opérations fournira un ensemble de données étendu pour une utilisation ultérieure.


La section suivante concerne simplement la saisie de colonnes supplémentaires pour les variables flottantes. Je vais donc la sauter et passer aux « agrégats nommés » de Pandas. Comme vous pouvez le voir ci-dessous, des agrégats nommés sont créés à partir de l'alias Pandas « pd » issu de l'importation.

# company + ratios + forecast named aggregates
company_cash_agg = pd.NamedAgg(column='Cash_Per_Share', aggfunc='mean')
company_tangible_agg = pd.NamedAgg(column='Tangible_Book_Value_Per_Share', aggfunc='median')
company_free_cash_agg = pd.NamedAgg(column='Free_Cash_Flow_Per_Share', aggfunc='mean')
company_discounted_agg = pd.NamedAgg(column='Discounted_Cash_Flow', aggfunc='mean')
company_yield_agg = pd.NamedAgg(column='Free_Cash_Flow_Yield_PCT', aggfunc='mean')
company_pe_agg = pd.NamedAgg(column='Price_To_Earnings', aggfunc='median')
company_roe_agg = pd.NamedAgg(column='Return_On_Equity_PCT', aggfunc='median')

Les résultats de chaque agrégat sont calculés à l'aide d'un dataframe contenant la ou les colonnes et en ajoutant un groupement. Les résultats nommés, par exemple « mean_10yr_cash », etc., constitueront les noms de colonnes du dataframe agrégé résultant. Vous remarquerez ci-dessous qu'il est possible d'obtenir plusieurs résultats agrégés nommés à partir d'un seul dataframe ; il s'agit en fait d'une seule instruction.

# Get results for each aggregate
company_cash_all = company_ratios_forecast_df.groupby(['Ticker', 'CompanyName']).agg(mean_10yr_cash=company_cash_agg,                          mean_10yr_free_cash=company_free_cash_agg,
mean_10yr_discounted_cash=company_discounted_agg,
mean_10yr_cash_yield=company_yield_agg)

company_metrics = company_ratios_forecast_df.groupby(['Ticker', 'CompanyName']).agg(median_tangible_bv_share=company_tangible_agg, median_pe=company_pe_agg, median_roe_pct=company_roe_agg)

# Non named aggregates
df_div = forecasts_df.groupby('CalendarYear')['Dividend_Per_Share_Growth'].agg(['mean', 'median', 'quantile'])
df_sga = forecasts_df.groupby('CalendarYear')['Debt_Growth_PCT'].agg(['mean', 'median', 'quantile'])

Résultats:

Agrégats de trésorerie de l'entreprise
Company Cash Aggregates
agrégats de mesures d'entreprise
Company Metrics Aggregates













agrégats de croissance du dividende par action
Dividend Per Share Growth
agrégats de pourcentage de croissance de la dette
Debt Growth PCT














Graphique des résultats de Pandas

J'ai décidé de mieux illustrer l'exemple de sortie agrégée en représentant graphiquement les résultats à l'aide de Matplotlib et de Seaborn . Parfois, un visuel fournit des informations contextuelles qui ne sont qu'implicites lors de l'examen des calculs de sortie.

pandas créant des graphiques
Pandas creating graphs

Le bloc de code ci-dessous contient des commentaires qui vous donneront le contexte du tracé des ensembles de données obtenus. C'est relativement simple si vous connaissez Matplotlib ; sinon, vous devrez peut-être vous renseigner sur la syntaxe. Si vous savez créer des graphiques, c'est assez intuitif, car les bibliothèques utilisent la terminologie utilisée pour les graphiques.

# 1. Dividend Growth and Debt Growth Over Time
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))

# Dividend Growth Plot
df_div.plot(ax=ax1, marker='o')
ax1.set_title('Dividend Per Share Growth Over Time')
ax1.set_xlabel('Calendar Year')
ax1.set_ylabel('Growth Rate')
ax1.legend(['Mean', 'Median', 'Quantile'])
ax1.grid(True)

# Debt Growth Plot
df_sga.plot(ax=ax2, marker='o')
ax2.set_title('Debt Growth Percentage Over Time')
ax2.set_xlabel('Calendar Year')
ax2.set_ylabel('Growth Rate (%)')
ax2.legend(['Mean', 'Median', 'Quantile'])
ax2.grid(True)

plt.tight_layout()
plt.show()

# 2. Company Metrics Distribution
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))

# Cash Metrics
sns.boxplot(data=company_cash_all, ax=ax1)
ax1.set_title('Distribution of Cash Metrics')
ax1.tick_params(axis='x', rotation=45)

# Company Performance Metrics
sns.boxplot(data=company_metrics, ax=ax2)
ax2.set_title('Distribution of Company Performance Metrics')
ax2.tick_params(axis='x', rotation=45)

# Top 10 Companies by Free Cash Flow Yield
top_cash_yield = company_cash_all.nlargest(10, 'mean_10yr_cash_yield')
sns.barplot(data=top_cash_yield.reset_index(), x='CompanyName', y='mean_10yr_cash_yield', ax=ax3)
ax3.set_title('Top 10 Companies by Free Cash Flow Yield')
ax3.tick_params(axis='x', rotation=45)

# Top 10 Companies by ROE
top_roe = company_metrics.nlargest(10, 'median_roe_pct')
sns.barplot(data=top_roe.reset_index(), x='CompanyName', y='median_roe_pct', ax=ax4)
ax4.set_title('Top 10 Companies by Return on Equity')
ax4.tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.show()

# 3. Correlation Matrix of Key Metrics; Combine relevant metrics
correlation_metrics = pd.merge(
    company_cash_all.reset_index(),
    company_metrics.reset_index(),
    on=['Ticker', 'CompanyName']
)

correlation_cols = [col for col in correlation_metrics.columns
                   if col not in ['Ticker', 'CompanyName']]
correlation_matrix = correlation_metrics[correlation_cols].corr()

plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0)
plt.title('Correlation Matrix of Company Metrics')
plt.tight_layout()
plt.show()
matrice de corrélation

graphiques à barres
Bar graphs

graphiques linéaires
Line graphs
J'espère avoir pu vous éclairer sur un cas d'utilisation analytique potentiel avec Pandas et Apache Iceberg. J'explorerai probablement d'autres variantes prochainement, à commencer par Apache Spark et Iceberg : les données sources sont lues depuis Iceberg, les agrégats et les calculs sont effectués dans Spark, et le résultat est réécrit dans Iceberg. Merci de votre lecture.
pandas faisant signe au revoir
Pandas waving bye


bottom of page