top of page

Análisis de Apache Iceberg y Pandas: Parte III

  • Foto del escritor: Claude Paugh
    Claude Paugh
  • 11 may
  • 5 Min. de lectura

Actualizado: 20 may

Los dos artículos anteriores trataron sobre la evaluación de Apache Iceberg y sus capacidades, así como sobre el uso de PyIceberg para crear objetos y cargar datos. Esta publicación se centrará en la extracción de datos y el uso de dataframes de Pandas para crear análisis.

Análisis de Pandas
Pandas Analytics

Pandas de Python y Iceberg de Apache

En cuanto a los paquetes de Python, Pandas ha sido uno de los más populares desde su lanzamiento. Su punto óptimo era proporcionar matrices multidimensionales para representar datos (tablas) y admite numerosos cálculos y agregaciones, desde básicos hasta avanzados, que suelen usar quienes se han graduado de las hojas de cálculo o simplemente no les gustan. A medida que la ciencia de datos se ha desarrollado a lo largo de los años, la popularidad de Pandas también ha crecido.


Pandas utiliza en gran medida el paquete de Python numpy para matrices, tipos y funciones matemáticas. También puedes filtrar, segmentar, crear subconjuntos y extender los dataframes mediante diversas técnicas. Si lo planificas correctamente, puedes lograr que varios DataFrames de Pandas imiten cubos multidimensionales. No necesariamente de forma cohesiva, sino funcional.

pandas leyendo archivos
Pandas reading files

Pandas cuenta con varios métodos " read_ " en la clase que permiten tomar una entrada y crear un marco de datos a partir de ella. Por ejemplo, CSV, JSON, Parquet, archivos delimitados, sentencias SQL/conjuntos de resultados, Apache AVRO y ORC, HDF5, búferes, archivos pickle, HTML y listas o diccionarios en Python, entre otras fuentes que Pandas puede leer y procesar.


Agregados con nombre de Pandas

Tras definir y cargar las tablas Iceberg, en mi publicación anterior , realizar un ejercicio de análisis con Pandas me pareció una buena prueba de algo que podría representar un caso de uso real. Empecé con los fundamentos de la obtención y recuperación de datos:

from get_catalog import get_catalog
import pandas as pd
from pyiceberg.expressions import GreaterThan, GreaterThanOrEqual, EqualTo, NotEqualTo
import matplotlib.pyplot as plt
import seaborn as sns

# Pandas output display formatting
pd.options.display.max_rows = 50
pd.options.display.max_columns = 50
pd.options.display.width = 1000
pd.options.display.colheader_justify = 'center'
pd.options.display.precision = 4
pd.options.display.float_format = '{:,.2f}'.format

# Get catalog
catalog = get_catalog()

# List tables in docs namespace
catalog.list_tables("docs")

# Load tables from Iceberg docs namespace
company = catalog.load_table("docs.company")
ratios_ttm = catalog.load_table("docs.ratios_ttm")
forecasts = catalog.load_table("docs.forecasts")

# Apply filters, convert dates, and convert to pandas
company_tech = company.scan(row_filter=(EqualTo('Sector', 'Technology')))
company_tech_usd = company_tech.filter(EqualTo('Currency','USD'))
company_tech_use_df = company_tech_usd.to_pandas()
company_df = company.scan().to_pandas()

La línea de comentario " Aplicar filtros... " anterior ofrece algo que destacar en lo que respecta a PyIceberg. Puedes ver en la importación de pyiceberg.expressions indica que se han importado algunos tipos de expresión para su uso. Se trata básicamente de cuatro expresiones utilizadas para la evaluación numérica y de cadenas. El primer filtro de fila (row_filter) se centra únicamente en el sector tecnológico mediante la expresión "EqualTo" de la importación. El segundo filtro limita las filas a USD. La sintaxis es ligeramente diferente debido a que los objetos utilizados son distintos. El primer objeto es una tabla que se escanea y filtra, mientras que el segundo es un objeto DataScan que se filtra.

pandas calculando totales
Pandas calculating totals

La siguiente sección realiza la conversión de tipos para Pandas y los dataframes se fusionan para crear nuevos objetos. Utilicé un filtro de Pandas en el objeto company_df , a modo de contraste con el filtro anterior que utiliza los métodos pyiceberg.expressions . Cada columna del dataframe tiene el tipo de un objeto, por lo que, para realizar cálculos numéricos, necesito convertir las columnas para que utilicen un tipo numérico (float). También realizo otro análisis en la tabla "pronósticos" de Iceberg para filtrar por "Dividend_Per_Share_Growth" y luego convierto la salida a Pandas.

# Pandas filters; set date datatypes
company_df = company_df[company_df['Currency'] == 'USD']
company_df['AsOfDate'] = pd.to_datetime(company_df['AsOfDate'], format='%Y-%m-%d').dt.date

ratios_ttm_df = ratios_ttm.scan(row_filter=GreaterThan('Cash_Per_Share', 0.01)).to_pandas()
ratios_ttm_df['AsOfDate'] = pd.to_datetime(ratios_ttm_df['AsOfDate'], format='%Y-%m-%d').dt.date

forecasts_df = forecasts.scan(row_filter=GreaterThanOrEqual('Dividend_Per_Share_Growth', 0.000)).to_pandas()
forecasts_df['AsOfDate'] = pd.to_datetime(forecasts_df['AsOfDate'], format='%Y-%m-%d').dt.date

# Merge tables and clean up duplicates
company_ratios_df = pd.merge(company_df, ratios_ttm_df, how='inner', on='Ticker').drop_duplicates(subset=['Ticker']).reset_index(drop=True)
company_forecast_df = pd.merge(company_df, forecasts_df, how='inner', on='Ticker').drop_duplicates(subset=['Ticker']).reset_index(drop=True)
company_ratios_forecast_df = pd.merge(company_ratios_df, forecasts_df, how='inner', on='Ticker').drop_duplicates(subset=['Ticker']).reset_index(drop=True)

La última sección del código realiza una fusión de dataframes utilizando la columna "Ticker" y elimina los duplicados que utilizan la misma columna como subconjunto. Restablezco el índice; de lo contrario, mis siguientes operaciones no funcionarán. Para los usuarios de SQL, la fusión en una columna común mediante "how='inner'" es funcionalmente similar a una unión interna entre dos tablas en un RDBMS. Los resultados de estas operaciones proporcionarán un conjunto de datos ampliado para su uso posterior.


La siguiente sección trata sobre la escritura de columnas para flotantes, así que la omitiré y pasaré a los "Agregados con Nombre" de Pandas. Como puede ver a continuación, se crean los agregados con nombre usando el alias de Pandas de la importación, "pd".

# company + ratios + forecast named aggregates
company_cash_agg = pd.NamedAgg(column='Cash_Per_Share', aggfunc='mean')
company_tangible_agg = pd.NamedAgg(column='Tangible_Book_Value_Per_Share', aggfunc='median')
company_free_cash_agg = pd.NamedAgg(column='Free_Cash_Flow_Per_Share', aggfunc='mean')
company_discounted_agg = pd.NamedAgg(column='Discounted_Cash_Flow', aggfunc='mean')
company_yield_agg = pd.NamedAgg(column='Free_Cash_Flow_Yield_PCT', aggfunc='mean')
company_pe_agg = pd.NamedAgg(column='Price_To_Earnings', aggfunc='median')
company_roe_agg = pd.NamedAgg(column='Return_On_Equity_PCT', aggfunc='median')

Los resultados de cada agregado se calculan utilizando un marco de datos que contiene las columnas y añadiendo un grupo por. Los resultados con nombre, por ejemplo, "media_10años_efectivo", serán los nombres de columna del marco de datos agregado resultante. Observará a continuación que es posible obtener varios resultados agregados con nombre a partir de un único marco de datos agrupado por; básicamente, una sola instrucción.

# Get results for each aggregate
company_cash_all = company_ratios_forecast_df.groupby(['Ticker', 'CompanyName']).agg(mean_10yr_cash=company_cash_agg,                          mean_10yr_free_cash=company_free_cash_agg,
mean_10yr_discounted_cash=company_discounted_agg,
mean_10yr_cash_yield=company_yield_agg)

company_metrics = company_ratios_forecast_df.groupby(['Ticker', 'CompanyName']).agg(median_tangible_bv_share=company_tangible_agg, median_pe=company_pe_agg, median_roe_pct=company_roe_agg)

# Non named aggregates
df_div = forecasts_df.groupby('CalendarYear')['Dividend_Per_Share_Growth'].agg(['mean', 'median', 'quantile'])
df_sga = forecasts_df.groupby('CalendarYear')['Debt_Growth_PCT'].agg(['mean', 'median', 'quantile'])

Resultados:

Agregados de efectivo de la empresa
Company Cash Aggregates
agregados de métricas de la empresa
Company Metrics Aggregates













agregados de crecimiento del dividendo por acción
Dividend Per Share Growth
porcentajes agregados de crecimiento de la deuda
Debt Growth PCT














Graficar los resultados de Pandas

Decidí ilustrar mejor el ejemplo de salida agregada graficando los resultados con matplotlib y seaborn . A veces, un elemento visual proporciona información contextual que solo se intuye al observar los cálculos de salida.

pandas creando gráficos
Pandas creating graphs

El bloque de código a continuación contiene comentarios que le brindarán el contexto en el que se grafican los conjuntos de datos resultantes. Es bastante sencillo si tiene conocimientos de matplotlib; de lo contrario, podría necesitar consultar la sintaxis. Si sabe cómo crear gráficos, es bastante intuitivo, ya que las bibliotecas utilizan terminología gráfica.

# 1. Dividend Growth and Debt Growth Over Time
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))

# Dividend Growth Plot
df_div.plot(ax=ax1, marker='o')
ax1.set_title('Dividend Per Share Growth Over Time')
ax1.set_xlabel('Calendar Year')
ax1.set_ylabel('Growth Rate')
ax1.legend(['Mean', 'Median', 'Quantile'])
ax1.grid(True)

# Debt Growth Plot
df_sga.plot(ax=ax2, marker='o')
ax2.set_title('Debt Growth Percentage Over Time')
ax2.set_xlabel('Calendar Year')
ax2.set_ylabel('Growth Rate (%)')
ax2.legend(['Mean', 'Median', 'Quantile'])
ax2.grid(True)

plt.tight_layout()
plt.show()

# 2. Company Metrics Distribution
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))

# Cash Metrics
sns.boxplot(data=company_cash_all, ax=ax1)
ax1.set_title('Distribution of Cash Metrics')
ax1.tick_params(axis='x', rotation=45)

# Company Performance Metrics
sns.boxplot(data=company_metrics, ax=ax2)
ax2.set_title('Distribution of Company Performance Metrics')
ax2.tick_params(axis='x', rotation=45)

# Top 10 Companies by Free Cash Flow Yield
top_cash_yield = company_cash_all.nlargest(10, 'mean_10yr_cash_yield')
sns.barplot(data=top_cash_yield.reset_index(), x='CompanyName', y='mean_10yr_cash_yield', ax=ax3)
ax3.set_title('Top 10 Companies by Free Cash Flow Yield')
ax3.tick_params(axis='x', rotation=45)

# Top 10 Companies by ROE
top_roe = company_metrics.nlargest(10, 'median_roe_pct')
sns.barplot(data=top_roe.reset_index(), x='CompanyName', y='median_roe_pct', ax=ax4)
ax4.set_title('Top 10 Companies by Return on Equity')
ax4.tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.show()

# 3. Correlation Matrix of Key Metrics; Combine relevant metrics
correlation_metrics = pd.merge(
    company_cash_all.reset_index(),
    company_metrics.reset_index(),
    on=['Ticker', 'CompanyName']
)

correlation_cols = [col for col in correlation_metrics.columns
                   if col not in ['Ticker', 'CompanyName']]
correlation_matrix = correlation_metrics[correlation_cols].corr()

plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0)
plt.title('Correlation Matrix of Company Metrics')
plt.tight_layout()
plt.show()
matriz de correlación

gráficos de barras
Bar graphs

gráficos de líneas
Line graphs

Espero haberles brindado una idea de un posible caso de uso analítico con Pandas y Apache Iceberg. Hay otras variantes que probablemente exploraré próximamente, empezando por Apache Spark e Iceberg, donde los datos de origen se leen desde Iceberg, las agregaciones y los cálculos se realizan en Spark, y la salida resultante se vuelve a escribir en Iceberg. Gracias por leer.

pandas saludando
Pandas waving bye


+1 508-203-1492

Bedford, Massachusetts 01730

bottom of page