Apache Iceberg 和 Pandas Analytics:第三部分
- Claude Paugh
- 5月11日
- 讀畢需時 5 分鐘
已更新:8月18日
前两篇文章主要介绍了 Apache Iceberg 及其功能的评估,以及如何使用 PyIceberg 创建对象和加载数据。本文将重点介绍如何导出数据,以及如何使用 Pandas DataFrame 进行分析。

Python Pandas 和 Apache Iceberg
就 Python 软件包而言,Pandas 自首次发布以来就一直是最受欢迎的软件包之一。它的“亮点”在于提供多维数组(即表格)来表示数据,并支持许多从基础到高级的计算/聚合操作,这些操作通常被那些“毕业”或不喜欢电子表格的人使用。随着数据科学多年来的发展,Pandas 的受欢迎程度也随之增长。
Pandas 大量使用了 numpy Python 包来处理数组、类型和数学函数。你还可以使用各种技术对数据帧进行过滤、切片、子集化和扩展。如果你规划得当,可以有效地让多个 Pandas 数据帧模拟多维立方体。无需保证其一致性,但功能上却非常出色。

Pandas 类中有各种“ read_ ”方法,可用于接收输入并根据该输入数据创建数据框。例如,Pandas 可以读取并提取数据的来源包括 CSV、JSON、Parquet、分隔文件、SQL 语句/结果集、Apache AVRO & ORC、HDF5、缓冲区、pickle 文件、html 以及 Python 中的列表或字典。
Pandas 命名聚合
在我之前的文章中,定义并加载了 Iceberg 表之后,用 Pandas 进行分析练习似乎是一个很好的测试,可以代表实际用例。我从获取和检索数据的基础知识开始:
from get_catalog import get_catalog
import pandas as pd
from pyiceberg.expressions import GreaterThan, GreaterThanOrEqual, EqualTo, NotEqualTo
import matplotlib.pyplot as plt
import seaborn as sns
# Pandas output display formatting
pd.options.display.max_rows = 50
pd.options.display.max_columns = 50
pd.options.display.width = 1000
pd.options.display.colheader_justify = 'center'
pd.options.display.precision = 4
pd.options.display.float_format = '{:,.2f}'.format
# Get catalog
catalog = get_catalog()
# List tables in docs namespace
catalog.list_tables("docs")
# Load tables from Iceberg docs namespace
company = catalog.load_table("docs.company")
ratios_ttm = catalog.load_table("docs.ratios_ttm")
forecasts = catalog.load_table("docs.forecasts")
# Apply filters, convert dates, and convert to pandas
company_tech = company.scan(row_filter=(EqualTo('Sector', 'Technology')))
company_tech_usd = company_tech.filter(EqualTo('Currency','USD'))
company_tech_use_df = company_tech_usd.to_pandas()
company_df = company.scan().to_pandas()上面的“应用过滤器... ”注释行指出了 PyIceberg 的一些不足之处。你可以从导入以下内容中看到 pyiceberg.expressions中已导入一些表达式类型以供使用。它们本质上是用于数值和字符串求值的四种表达式。第一个row_filter通过使用导入的“EqualTo”表达式,仅针对科技行业。第二个过滤器将行限制为仅限美元货币。由于使用的对象不同,语法略有不同。第一个对象是被扫描和过滤的表,而第二个对象是被过滤的 DataScan 对象。

下一节将对 Pandas 进行类型转换,并将数据框合并以创建新对象。我在company_df对象上使用了 Pandas 过滤器,只是为了与上面使用pyiceberg.expressions方法的过滤器进行对比。数据框中的每一列都被定义为对象,因此为了进行数值计算,我需要将列转换为数值类型(浮点型)。我还对 Iceberg 的“forecasts”表进行了另一次扫描,以便筛选出“Dividend_Per_Share_Growth” ,然后将输出转换为 Pandas 格式。
# Pandas filters; set date datatypes
company_df = company_df[company_df['Currency'] == 'USD']
company_df['AsOfDate'] = pd.to_datetime(company_df['AsOfDate'], format='%Y-%m-%d').dt.date
ratios_ttm_df = ratios_ttm.scan(row_filter=GreaterThan('Cash_Per_Share', 0.01)).to_pandas()
ratios_ttm_df['AsOfDate'] = pd.to_datetime(ratios_ttm_df['AsOfDate'], format='%Y-%m-%d').dt.date
forecasts_df = forecasts.scan(row_filter=GreaterThanOrEqual('Dividend_Per_Share_Growth', 0.000)).to_pandas()
forecasts_df['AsOfDate'] = pd.to_datetime(forecasts_df['AsOfDate'], format='%Y-%m-%d').dt.date
# Merge tables and clean up duplicates
company_ratios_df = pd.merge(company_df, ratios_ttm_df, how='inner', on='Ticker').drop_duplicates(subset=['Ticker']).reset_index(drop=True)
company_forecast_df = pd.merge(company_df, forecasts_df, how='inner', on='Ticker').drop_duplicates(subset=['Ticker']).reset_index(drop=True)
company_ratios_forecast_df = pd.merge(company_ratios_df, forecasts_df, how='inner', on='Ticker').drop_duplicates(subset=['Ticker']).reset_index(drop=True)代码的最后一部分使用数据框中的“Ticker”列进行数据框合并,并使用相同的列作为子集删除重复项。我重置了索引,否则接下来的操作将无法正常工作。对于 SQL 用户来说,使用“how='inner'”对公共列进行合并的功能类似于 RDBMS 中两个表之间的内连接。这些操作的结果将提供一个扩展数据集以供进一步使用。
下一节只是关于如何将列类型转换为浮动类型,所以我将跳过这一节,直接进入 Pandas 的“命名聚合”。如下所示,使用导入的 Pandas 别名“pd”,创建了命名聚合。
# company + ratios + forecast named aggregates
company_cash_agg = pd.NamedAgg(column='Cash_Per_Share', aggfunc='mean')
company_tangible_agg = pd.NamedAgg(column='Tangible_Book_Value_Per_Share', aggfunc='median')
company_free_cash_agg = pd.NamedAgg(column='Free_Cash_Flow_Per_Share', aggfunc='mean')
company_discounted_agg = pd.NamedAgg(column='Discounted_Cash_Flow', aggfunc='mean')
company_yield_agg = pd.NamedAgg(column='Free_Cash_Flow_Yield_PCT', aggfunc='mean')
company_pe_agg = pd.NamedAgg(column='Price_To_Earnings', aggfunc='median')
company_roe_agg = pd.NamedAgg(column='Return_On_Equity_PCT', aggfunc='median')每个聚合的结果都是使用包含相应列的数据框并添加分组依据来计算的。这些命名结果(例如“mean_10yr_cash”等)将成为最终聚合数据框的列名。您将在下文中注意到,可以从单个数据框的分组依据中获得多个命名聚合结果;基本上只需一条语句即可。
# Get results for each aggregate
company_cash_all = company_ratios_forecast_df.groupby(['Ticker', 'CompanyName']).agg(mean_10yr_cash=company_cash_agg, mean_10yr_free_cash=company_free_cash_agg,
mean_10yr_discounted_cash=company_discounted_agg,
mean_10yr_cash_yield=company_yield_agg)
company_metrics = company_ratios_forecast_df.groupby(['Ticker', 'CompanyName']).agg(median_tangible_bv_share=company_tangible_agg, median_pe=company_pe_agg, median_roe_pct=company_roe_agg)
# Non named aggregates
df_div = forecasts_df.groupby('CalendarYear')['Dividend_Per_Share_Growth'].agg(['mean', 'median', 'quantile'])
df_sga = forecasts_df.groupby('CalendarYear')['Debt_Growth_PCT'].agg(['mean', 'median', 'quantile'])结果:




绘制 Pandas 结果
为了更好地说明示例聚合输出,我决定使用matplotlib和seaborn绘制结果图表。有时,可视化图表提供的上下文信息只有在查看输出计算时才会隐含。

下面的代码块包含注释,可以为您提供绘制结果数据集的上下文。如果您对 matplotlib 有所了解,那么它相当直观;否则,您可能需要查找一些语法。如果您知道如何创建图表,那么它相当直观,因为该库使用了图表绘制中使用的术语。
# 1. Dividend Growth and Debt Growth Over Time
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
# Dividend Growth Plot
df_div.plot(ax=ax1, marker='o')
ax1.set_title('Dividend Per Share Growth Over Time')
ax1.set_xlabel('Calendar Year')
ax1.set_ylabel('Growth Rate')
ax1.legend(['Mean', 'Median', 'Quantile'])
ax1.grid(True)
# Debt Growth Plot
df_sga.plot(ax=ax2, marker='o')
ax2.set_title('Debt Growth Percentage Over Time')
ax2.set_xlabel('Calendar Year')
ax2.set_ylabel('Growth Rate (%)')
ax2.legend(['Mean', 'Median', 'Quantile'])
ax2.grid(True)
plt.tight_layout()
plt.show()
# 2. Company Metrics Distribution
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
# Cash Metrics
sns.boxplot(data=company_cash_all, ax=ax1)
ax1.set_title('Distribution of Cash Metrics')
ax1.tick_params(axis='x', rotation=45)
# Company Performance Metrics
sns.boxplot(data=company_metrics, ax=ax2)
ax2.set_title('Distribution of Company Performance Metrics')
ax2.tick_params(axis='x', rotation=45)
# Top 10 Companies by Free Cash Flow Yield
top_cash_yield = company_cash_all.nlargest(10, 'mean_10yr_cash_yield')
sns.barplot(data=top_cash_yield.reset_index(), x='CompanyName', y='mean_10yr_cash_yield', ax=ax3)
ax3.set_title('Top 10 Companies by Free Cash Flow Yield')
ax3.tick_params(axis='x', rotation=45)
# Top 10 Companies by ROE
top_roe = company_metrics.nlargest(10, 'median_roe_pct')
sns.barplot(data=top_roe.reset_index(), x='CompanyName', y='median_roe_pct', ax=ax4)
ax4.set_title('Top 10 Companies by Return on Equity')
ax4.tick_params(axis='x', rotation=45)
plt.tight_layout()
plt.show()
# 3. Correlation Matrix of Key Metrics; Combine relevant metrics
correlation_metrics = pd.merge(
company_cash_all.reset_index(),
company_metrics.reset_index(),
on=['Ticker', 'CompanyName']
)
correlation_cols = [col for col in correlation_metrics.columns
if col not in ['Ticker', 'CompanyName']]
correlation_matrix = correlation_metrics[correlation_cols].corr()
plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0)
plt.title('Correlation Matrix of Company Metrics')
plt.tight_layout()
plt.show()


希望以上内容能够帮助大家深入了解 Pandas 和 Apache Iceberg 的潜在分析用例。近期我可能会探索其他一些变体,例如 Apache Spark 和 Iceberg 的组合:从 Iceberg 读取源数据,在 Spark 中进行聚合和计算,最后将结果写回 Iceberg。感谢您的阅读。


