top of page

Apache Iceberg e Pandas Analytics: Parte III

  • Foto do escritor: Claude Paugh
    Claude Paugh
  • 11 de mai.
  • 5 min de leitura

Atualizado: 20 de mai.

Os dois artigos anteriores abordaram a avaliação do Apache Iceberg e seus recursos, bem como o uso do PyIceberg para criar objetos e carregar dados. Este artigo se concentrará na extração de dados e no uso de dataframes do Pandas para criar análises.

Análises de Pandas
Pandas Analytics

Python Pandas e Apache Iceberg

Em termos de pacotes Python, o Pandas tem sido um dos mais populares desde o seu lançamento inicial. Seu "ponto ideal" era fornecer matrizes multidimensionais para representar dados, também conhecidas como tabelas, e suportar diversos cálculos/agregações, do básico ao avançado, comumente usados por aqueles que se formaram ou simplesmente não gostavam de planilhas. Com o desenvolvimento da Ciência de Dados ao longo dos anos, a popularidade do Pandas também cresceu.


O Pandas utiliza bastante o pacote Python Numpy para matrizes, tipos e funções matemáticas. Você também pode filtrar, fatiar, subdividir e estender os dataframes usando diversas técnicas. Se você planejar corretamente, poderá efetivamente ter vários DataFrames do Pandas imitando cubos multidimensionais. Não necessariamente de forma coesa, mas funcional.

pandas lendo arquivos
Pandas reading files

O Pandas possui vários métodos " read_ " na classe que permitem receber uma entrada e criar um dataframe a partir desses dados. Por exemplo, CSV, JSON, Parquet, arquivos delimitados, instruções SQL/conjuntos de resultados, Apache AVRO e ORC, HDF5, buffers, arquivos pickle, HTML e listas ou dicionários em Python, para citar algumas fontes que o Pandas pode ler e processar dados.


Agregados Nomeados Pandas

Depois de definir e carregar as tabelas Iceberg, no meu post anterior , realizar um exercício de análise com o Pandas pareceu um bom teste de algo que pudesse representar um caso de uso real. Comecei com os princípios básicos de obtenção e recuperação de dados:

from get_catalog import get_catalog
import pandas as pd
from pyiceberg.expressions import GreaterThan, GreaterThanOrEqual, EqualTo, NotEqualTo
import matplotlib.pyplot as plt
import seaborn as sns

# Pandas output display formatting
pd.options.display.max_rows = 50
pd.options.display.max_columns = 50
pd.options.display.width = 1000
pd.options.display.colheader_justify = 'center'
pd.options.display.precision = 4
pd.options.display.float_format = '{:,.2f}'.format

# Get catalog
catalog = get_catalog()

# List tables in docs namespace
catalog.list_tables("docs")

# Load tables from Iceberg docs namespace
company = catalog.load_table("docs.company")
ratios_ttm = catalog.load_table("docs.ratios_ttm")
forecasts = catalog.load_table("docs.forecasts")

# Apply filters, convert dates, and convert to pandas
company_tech = company.scan(row_filter=(EqualTo('Sector', 'Technology')))
company_tech_usd = company_tech.filter(EqualTo('Currency','USD'))
company_tech_use_df = company_tech_usd.to_pandas()
company_df = company.scan().to_pandas()

A linha de comentário " Aplicar filtros... " acima oferece algo a ser destacado no que diz respeito ao PyIceberg. Você pode ver na importação de pyiceberg.expressions , que alguns tipos de expressão foram importados para uso. São essencialmente quatro expressões usadas para avaliação numérica e de strings. O primeiro row_filter visa apenas o setor de tecnologia, usando a expressão "EqualTo" da importação. O segundo filtro limita as linhas apenas à moeda USD. A sintaxe é ligeiramente diferente porque os objetos usados são diferentes. O primeiro objeto é uma tabela sendo escaneada e filtrada, enquanto o segundo é um objeto DataScan sendo filtrado.

pandas calculando totais
Pandas calculating totals

A próxima seção faz a conversão de tipos para pandas, e os dataframes são mesclados para criar novos objetos. Usei um filtro Pandas no objeto company_df , apenas para contrastar com o filtro acima usando os métodos pyiceberg.expressions . Cada coluna no dataframe é tipada como um objeto, portanto, para fazer cálculos numéricos, preciso converter as colunas para usar um tipo numérico (float). Também realizo outra varredura na tabela "forecasts" do Iceberg, a fim de filtrar por 'Dividend_Per_Share_Growth' e, em seguida, converter a saída para Pandas.

# Pandas filters; set date datatypes
company_df = company_df[company_df['Currency'] == 'USD']
company_df['AsOfDate'] = pd.to_datetime(company_df['AsOfDate'], format='%Y-%m-%d').dt.date

ratios_ttm_df = ratios_ttm.scan(row_filter=GreaterThan('Cash_Per_Share', 0.01)).to_pandas()
ratios_ttm_df['AsOfDate'] = pd.to_datetime(ratios_ttm_df['AsOfDate'], format='%Y-%m-%d').dt.date

forecasts_df = forecasts.scan(row_filter=GreaterThanOrEqual('Dividend_Per_Share_Growth', 0.000)).to_pandas()
forecasts_df['AsOfDate'] = pd.to_datetime(forecasts_df['AsOfDate'], format='%Y-%m-%d').dt.date

# Merge tables and clean up duplicates
company_ratios_df = pd.merge(company_df, ratios_ttm_df, how='inner', on='Ticker').drop_duplicates(subset=['Ticker']).reset_index(drop=True)
company_forecast_df = pd.merge(company_df, forecasts_df, how='inner', on='Ticker').drop_duplicates(subset=['Ticker']).reset_index(drop=True)
company_ratios_forecast_df = pd.merge(company_ratios_df, forecasts_df, how='inner', on='Ticker').drop_duplicates(subset=['Ticker']).reset_index(drop=True)

A última seção do código faz uma mesclagem de dataframes usando a coluna "Ticker" nos dataframes e remove duplicatas usando a mesma coluna como subconjunto. Eu redefino o índice, caso contrário, minhas próximas operações não funcionarão. Para usuários de SQL, a mesclagem em uma coluna comum usando "how='inner'" é funcionalmente semelhante a uma junção interna entre duas tabelas em um RDBMS. Os resultados dessas operações fornecerão um conjunto de dados estendido para uso posterior.


A próxima seção é apenas mais uma digitação de colunas para float, então vou pular essa seção e passar para os "Agregados Nomeados" do Pandas. Como você pode ver abaixo, usando o alias do Pandas da importação, "pd", agregados nomeados são criados.

# company + ratios + forecast named aggregates
company_cash_agg = pd.NamedAgg(column='Cash_Per_Share', aggfunc='mean')
company_tangible_agg = pd.NamedAgg(column='Tangible_Book_Value_Per_Share', aggfunc='median')
company_free_cash_agg = pd.NamedAgg(column='Free_Cash_Flow_Per_Share', aggfunc='mean')
company_discounted_agg = pd.NamedAgg(column='Discounted_Cash_Flow', aggfunc='mean')
company_yield_agg = pd.NamedAgg(column='Free_Cash_Flow_Yield_PCT', aggfunc='mean')
company_pe_agg = pd.NamedAgg(column='Price_To_Earnings', aggfunc='median')
company_roe_agg = pd.NamedAgg(column='Return_On_Equity_PCT', aggfunc='median')

Os resultados para cada agregado são calculados usando um dataframe contendo a(s) coluna(s) e adicionando um group by. Os resultados nomeados, por exemplo, "mean_10yr_cash", etc., serão os nomes das colunas para o dataframe agregado resultante. Você notará abaixo que é possível obter vários resultados agregados nomeados a partir do único dataframe group by; basicamente, uma única instrução.

# Get results for each aggregate
company_cash_all = company_ratios_forecast_df.groupby(['Ticker', 'CompanyName']).agg(mean_10yr_cash=company_cash_agg,                          mean_10yr_free_cash=company_free_cash_agg,
mean_10yr_discounted_cash=company_discounted_agg,
mean_10yr_cash_yield=company_yield_agg)

company_metrics = company_ratios_forecast_df.groupby(['Ticker', 'CompanyName']).agg(median_tangible_bv_share=company_tangible_agg, median_pe=company_pe_agg, median_roe_pct=company_roe_agg)

# Non named aggregates
df_div = forecasts_df.groupby('CalendarYear')['Dividend_Per_Share_Growth'].agg(['mean', 'median', 'quantile'])
df_sga = forecasts_df.groupby('CalendarYear')['Debt_Growth_PCT'].agg(['mean', 'median', 'quantile'])

Resultados:

Agregados de caixa da empresa
Company Cash Aggregates
agregados de métricas da empresa
Company Metrics Aggregates













dividendos por ação agregados de crescimento
Dividend Per Share Growth
agregados percentuais de crescimento da dívida
Debt Growth PCT














Resultados do Graphing Pandas

Decidi ilustrar melhor o exemplo de saída agregada, representando graficamente os resultados usando matplotlib e seaborn . Às vezes, um visual fornece informações contextuais que são apenas implícitas ao analisar os cálculos de saída.

pandas criando gráficos
Pandas creating graphs

O bloco de código abaixo contém comentários que lhe darão o contexto em que os conjuntos de dados resultantes estão sendo plotados. É razoavelmente simples, se você tiver algum conhecimento de matplotlib; caso contrário, pode ser necessário consultar alguma sintaxe. Se você sabe como criar gráficos, é um pouco intuitivo, já que as bibliotecas utilizam a terminologia usada em gráficos.

# 1. Dividend Growth and Debt Growth Over Time
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))

# Dividend Growth Plot
df_div.plot(ax=ax1, marker='o')
ax1.set_title('Dividend Per Share Growth Over Time')
ax1.set_xlabel('Calendar Year')
ax1.set_ylabel('Growth Rate')
ax1.legend(['Mean', 'Median', 'Quantile'])
ax1.grid(True)

# Debt Growth Plot
df_sga.plot(ax=ax2, marker='o')
ax2.set_title('Debt Growth Percentage Over Time')
ax2.set_xlabel('Calendar Year')
ax2.set_ylabel('Growth Rate (%)')
ax2.legend(['Mean', 'Median', 'Quantile'])
ax2.grid(True)

plt.tight_layout()
plt.show()

# 2. Company Metrics Distribution
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))

# Cash Metrics
sns.boxplot(data=company_cash_all, ax=ax1)
ax1.set_title('Distribution of Cash Metrics')
ax1.tick_params(axis='x', rotation=45)

# Company Performance Metrics
sns.boxplot(data=company_metrics, ax=ax2)
ax2.set_title('Distribution of Company Performance Metrics')
ax2.tick_params(axis='x', rotation=45)

# Top 10 Companies by Free Cash Flow Yield
top_cash_yield = company_cash_all.nlargest(10, 'mean_10yr_cash_yield')
sns.barplot(data=top_cash_yield.reset_index(), x='CompanyName', y='mean_10yr_cash_yield', ax=ax3)
ax3.set_title('Top 10 Companies by Free Cash Flow Yield')
ax3.tick_params(axis='x', rotation=45)

# Top 10 Companies by ROE
top_roe = company_metrics.nlargest(10, 'median_roe_pct')
sns.barplot(data=top_roe.reset_index(), x='CompanyName', y='median_roe_pct', ax=ax4)
ax4.set_title('Top 10 Companies by Return on Equity')
ax4.tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.show()

# 3. Correlation Matrix of Key Metrics; Combine relevant metrics
correlation_metrics = pd.merge(
    company_cash_all.reset_index(),
    company_metrics.reset_index(),
    on=['Ticker', 'CompanyName']
)

correlation_cols = [col for col in correlation_metrics.columns
                   if col not in ['Ticker', 'CompanyName']]
correlation_matrix = correlation_metrics[correlation_cols].corr()

plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0)
plt.title('Correlation Matrix of Company Metrics')
plt.tight_layout()
plt.show()
matriz de correlação

gráficos de barras
Bar graphs

gráficos de linha
Line graphs

Espero ter conseguido fornecer alguns insights sobre um possível caso de uso analítico com o Pandas e o Apache Iceberg. Há outras variações que provavelmente explorarei em um futuro próximo, começando com o Apache Spark e o Iceberg, onde os dados de origem são lidos do Iceberg, agregados e cálculos ocorrem no Spark e a saída resultante é gravada de volta no Iceberg. Obrigado pela leitura.

pandas acenando tchau
Pandas waving bye


+1 508-203-1492

Bedford, MA 01730

bottom of page