Apache Iceberg e Pandas Analytics: Parte III
- Claude Paugh
- 11 de mai.
- 5 min de leitura
Atualizado: 20 de mai.
Os dois artigos anteriores abordaram a avaliação do Apache Iceberg e seus recursos, bem como o uso do PyIceberg para criar objetos e carregar dados. Este artigo se concentrará na extração de dados e no uso de dataframes do Pandas para criar análises.

Python Pandas e Apache Iceberg
Em termos de pacotes Python, o Pandas tem sido um dos mais populares desde o seu lançamento inicial. Seu "ponto ideal" era fornecer matrizes multidimensionais para representar dados, também conhecidas como tabelas, e suportar diversos cálculos/agregações, do básico ao avançado, comumente usados por aqueles que se formaram ou simplesmente não gostavam de planilhas. Com o desenvolvimento da Ciência de Dados ao longo dos anos, a popularidade do Pandas também cresceu.
O Pandas utiliza bastante o pacote Python Numpy para matrizes, tipos e funções matemáticas. Você também pode filtrar, fatiar, subdividir e estender os dataframes usando diversas técnicas. Se você planejar corretamente, poderá efetivamente ter vários DataFrames do Pandas imitando cubos multidimensionais. Não necessariamente de forma coesa, mas funcional.

O Pandas possui vários métodos " read_ " na classe que permitem receber uma entrada e criar um dataframe a partir desses dados. Por exemplo, CSV, JSON, Parquet, arquivos delimitados, instruções SQL/conjuntos de resultados, Apache AVRO e ORC, HDF5, buffers, arquivos pickle, HTML e listas ou dicionários em Python, para citar algumas fontes que o Pandas pode ler e processar dados.
Agregados Nomeados Pandas
Depois de definir e carregar as tabelas Iceberg, no meu post anterior , realizar um exercício de análise com o Pandas pareceu um bom teste de algo que pudesse representar um caso de uso real. Comecei com os princípios básicos de obtenção e recuperação de dados:
from get_catalog import get_catalog
import pandas as pd
from pyiceberg.expressions import GreaterThan, GreaterThanOrEqual, EqualTo, NotEqualTo
import matplotlib.pyplot as plt
import seaborn as sns
# Pandas output display formatting
pd.options.display.max_rows = 50
pd.options.display.max_columns = 50
pd.options.display.width = 1000
pd.options.display.colheader_justify = 'center'
pd.options.display.precision = 4
pd.options.display.float_format = '{:,.2f}'.format
# Get catalog
catalog = get_catalog()
# List tables in docs namespace
catalog.list_tables("docs")
# Load tables from Iceberg docs namespace
company = catalog.load_table("docs.company")
ratios_ttm = catalog.load_table("docs.ratios_ttm")
forecasts = catalog.load_table("docs.forecasts")
# Apply filters, convert dates, and convert to pandas
company_tech = company.scan(row_filter=(EqualTo('Sector', 'Technology')))
company_tech_usd = company_tech.filter(EqualTo('Currency','USD'))
company_tech_use_df = company_tech_usd.to_pandas()
company_df = company.scan().to_pandas()
A linha de comentário " Aplicar filtros... " acima oferece algo a ser destacado no que diz respeito ao PyIceberg. Você pode ver na importação de pyiceberg.expressions , que alguns tipos de expressão foram importados para uso. São essencialmente quatro expressões usadas para avaliação numérica e de strings. O primeiro row_filter visa apenas o setor de tecnologia, usando a expressão "EqualTo" da importação. O segundo filtro limita as linhas apenas à moeda USD. A sintaxe é ligeiramente diferente porque os objetos usados são diferentes. O primeiro objeto é uma tabela sendo escaneada e filtrada, enquanto o segundo é um objeto DataScan sendo filtrado.

A próxima seção faz a conversão de tipos para pandas, e os dataframes são mesclados para criar novos objetos. Usei um filtro Pandas no objeto company_df , apenas para contrastar com o filtro acima usando os métodos pyiceberg.expressions . Cada coluna no dataframe é tipada como um objeto, portanto, para fazer cálculos numéricos, preciso converter as colunas para usar um tipo numérico (float). Também realizo outra varredura na tabela "forecasts" do Iceberg, a fim de filtrar por 'Dividend_Per_Share_Growth' e, em seguida, converter a saída para Pandas.
# Pandas filters; set date datatypes
company_df = company_df[company_df['Currency'] == 'USD']
company_df['AsOfDate'] = pd.to_datetime(company_df['AsOfDate'], format='%Y-%m-%d').dt.date
ratios_ttm_df = ratios_ttm.scan(row_filter=GreaterThan('Cash_Per_Share', 0.01)).to_pandas()
ratios_ttm_df['AsOfDate'] = pd.to_datetime(ratios_ttm_df['AsOfDate'], format='%Y-%m-%d').dt.date
forecasts_df = forecasts.scan(row_filter=GreaterThanOrEqual('Dividend_Per_Share_Growth', 0.000)).to_pandas()
forecasts_df['AsOfDate'] = pd.to_datetime(forecasts_df['AsOfDate'], format='%Y-%m-%d').dt.date
# Merge tables and clean up duplicates
company_ratios_df = pd.merge(company_df, ratios_ttm_df, how='inner', on='Ticker').drop_duplicates(subset=['Ticker']).reset_index(drop=True)
company_forecast_df = pd.merge(company_df, forecasts_df, how='inner', on='Ticker').drop_duplicates(subset=['Ticker']).reset_index(drop=True)
company_ratios_forecast_df = pd.merge(company_ratios_df, forecasts_df, how='inner', on='Ticker').drop_duplicates(subset=['Ticker']).reset_index(drop=True)
A última seção do código faz uma mesclagem de dataframes usando a coluna "Ticker" nos dataframes e remove duplicatas usando a mesma coluna como subconjunto. Eu redefino o índice, caso contrário, minhas próximas operações não funcionarão. Para usuários de SQL, a mesclagem em uma coluna comum usando "how='inner'" é funcionalmente semelhante a uma junção interna entre duas tabelas em um RDBMS. Os resultados dessas operações fornecerão um conjunto de dados estendido para uso posterior.
A próxima seção é apenas mais uma digitação de colunas para float, então vou pular essa seção e passar para os "Agregados Nomeados" do Pandas. Como você pode ver abaixo, usando o alias do Pandas da importação, "pd", agregados nomeados são criados.
# company + ratios + forecast named aggregates
company_cash_agg = pd.NamedAgg(column='Cash_Per_Share', aggfunc='mean')
company_tangible_agg = pd.NamedAgg(column='Tangible_Book_Value_Per_Share', aggfunc='median')
company_free_cash_agg = pd.NamedAgg(column='Free_Cash_Flow_Per_Share', aggfunc='mean')
company_discounted_agg = pd.NamedAgg(column='Discounted_Cash_Flow', aggfunc='mean')
company_yield_agg = pd.NamedAgg(column='Free_Cash_Flow_Yield_PCT', aggfunc='mean')
company_pe_agg = pd.NamedAgg(column='Price_To_Earnings', aggfunc='median')
company_roe_agg = pd.NamedAgg(column='Return_On_Equity_PCT', aggfunc='median')
Os resultados para cada agregado são calculados usando um dataframe contendo a(s) coluna(s) e adicionando um group by. Os resultados nomeados, por exemplo, "mean_10yr_cash", etc., serão os nomes das colunas para o dataframe agregado resultante. Você notará abaixo que é possível obter vários resultados agregados nomeados a partir do único dataframe group by; basicamente, uma única instrução.
# Get results for each aggregate
company_cash_all = company_ratios_forecast_df.groupby(['Ticker', 'CompanyName']).agg(mean_10yr_cash=company_cash_agg, mean_10yr_free_cash=company_free_cash_agg,
mean_10yr_discounted_cash=company_discounted_agg,
mean_10yr_cash_yield=company_yield_agg)
company_metrics = company_ratios_forecast_df.groupby(['Ticker', 'CompanyName']).agg(median_tangible_bv_share=company_tangible_agg, median_pe=company_pe_agg, median_roe_pct=company_roe_agg)
# Non named aggregates
df_div = forecasts_df.groupby('CalendarYear')['Dividend_Per_Share_Growth'].agg(['mean', 'median', 'quantile'])
df_sga = forecasts_df.groupby('CalendarYear')['Debt_Growth_PCT'].agg(['mean', 'median', 'quantile'])
Resultados:




Resultados do Graphing Pandas
Decidi ilustrar melhor o exemplo de saída agregada, representando graficamente os resultados usando matplotlib e seaborn . Às vezes, um visual fornece informações contextuais que são apenas implícitas ao analisar os cálculos de saída.

O bloco de código abaixo contém comentários que lhe darão o contexto em que os conjuntos de dados resultantes estão sendo plotados. É razoavelmente simples, se você tiver algum conhecimento de matplotlib; caso contrário, pode ser necessário consultar alguma sintaxe. Se você sabe como criar gráficos, é um pouco intuitivo, já que as bibliotecas utilizam a terminologia usada em gráficos.
# 1. Dividend Growth and Debt Growth Over Time
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
# Dividend Growth Plot
df_div.plot(ax=ax1, marker='o')
ax1.set_title('Dividend Per Share Growth Over Time')
ax1.set_xlabel('Calendar Year')
ax1.set_ylabel('Growth Rate')
ax1.legend(['Mean', 'Median', 'Quantile'])
ax1.grid(True)
# Debt Growth Plot
df_sga.plot(ax=ax2, marker='o')
ax2.set_title('Debt Growth Percentage Over Time')
ax2.set_xlabel('Calendar Year')
ax2.set_ylabel('Growth Rate (%)')
ax2.legend(['Mean', 'Median', 'Quantile'])
ax2.grid(True)
plt.tight_layout()
plt.show()
# 2. Company Metrics Distribution
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
# Cash Metrics
sns.boxplot(data=company_cash_all, ax=ax1)
ax1.set_title('Distribution of Cash Metrics')
ax1.tick_params(axis='x', rotation=45)
# Company Performance Metrics
sns.boxplot(data=company_metrics, ax=ax2)
ax2.set_title('Distribution of Company Performance Metrics')
ax2.tick_params(axis='x', rotation=45)
# Top 10 Companies by Free Cash Flow Yield
top_cash_yield = company_cash_all.nlargest(10, 'mean_10yr_cash_yield')
sns.barplot(data=top_cash_yield.reset_index(), x='CompanyName', y='mean_10yr_cash_yield', ax=ax3)
ax3.set_title('Top 10 Companies by Free Cash Flow Yield')
ax3.tick_params(axis='x', rotation=45)
# Top 10 Companies by ROE
top_roe = company_metrics.nlargest(10, 'median_roe_pct')
sns.barplot(data=top_roe.reset_index(), x='CompanyName', y='median_roe_pct', ax=ax4)
ax4.set_title('Top 10 Companies by Return on Equity')
ax4.tick_params(axis='x', rotation=45)
plt.tight_layout()
plt.show()
# 3. Correlation Matrix of Key Metrics; Combine relevant metrics
correlation_metrics = pd.merge(
company_cash_all.reset_index(),
company_metrics.reset_index(),
on=['Ticker', 'CompanyName']
)
correlation_cols = [col for col in correlation_metrics.columns
if col not in ['Ticker', 'CompanyName']]
correlation_matrix = correlation_metrics[correlation_cols].corr()
plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0)
plt.title('Correlation Matrix of Company Metrics')
plt.tight_layout()
plt.show()



Espero ter conseguido fornecer alguns insights sobre um possível caso de uso analítico com o Pandas e o Apache Iceberg. Há outras variações que provavelmente explorarei em um futuro próximo, começando com o Apache Spark e o Iceberg, onde os dados de origem são lidos do Iceberg, agregados e cálculos ocorrem no Spark e a saída resultante é gravada de volta no Iceberg. Obrigado pela leitura.
