top of page

Apache Iceberg と Pandas Analytics: パート III

更新日:5 日前

前回までの2つの記事では、Apache Icebergとその機能の評価、そしてPyIcebergを使ったオブジェクトの作成とデータの読み込みについて説明しました。今回は、データの取り出しと、Pandasデータフレームを使った分析の作成に焦点を当てます。


パンダアナリティクス
Pandas Analytics

Python Pandas と Apache Iceberg

Pythonパッケージの中で、Pandasはリリース当初から高い人気を誇っています。その「スイートスポット」は、データを表す多次元配列(いわゆるテーブル)を提供し、スプレッドシートを卒業した人や、スプレッドシートが苦手な人がよく使う、基本から高度な計算・集計まで幅広くサポートしていることです。データサイエンスが長年にわたり発展するにつれ、Pandasの人気も高まっています。


Pandasは、配列、型、数学関数をPythonのnumpyパッケージから大量に取得しています。また、様々な手法を用いてデータフレームをフィルタリング、スライス、サブセット化、拡張することも可能です。適切に計画すれば、複数のPandas DataFrameを効果的に多次元キューブに模倣することも可能です。必ずしも統一性があるわけではありませんが、機能的には可能です。

パンダがファイルを読み取る
Pandas reading files

Pandasクラスには様々な「 read_ 」メソッドがあり、入力データを受け取り、そのデータからデータフレームを作成することができます。例えば、CSV、JSON、Parquet、区切りファイル、SQL文/結果セット、Apache AVRO & ORC、HDF5、バッファ、ピクルファイル、HTML、Pythonのリストや辞書など、Pandasが読み込んでデータを処理できるソースは数多くあります。


パンダの名前付き集合体

前回の投稿でIcebergテーブルを定義してロードした後、Pandasを使った分析演習は、実際のユースケースを再現できる良いテストになると思いました。まずは、データの取得と取り出しの基本から始めました。

from get_catalog import get_catalog
import pandas as pd
from pyiceberg.expressions import GreaterThan, GreaterThanOrEqual, EqualTo, NotEqualTo
import matplotlib.pyplot as plt
import seaborn as sns

# Pandas output display formatting
pd.options.display.max_rows = 50
pd.options.display.max_columns = 50
pd.options.display.width = 1000
pd.options.display.colheader_justify = 'center'
pd.options.display.precision = 4
pd.options.display.float_format = '{:,.2f}'.format

# Get catalog
catalog = get_catalog()

# List tables in docs namespace
catalog.list_tables("docs")

# Load tables from Iceberg docs namespace
company = catalog.load_table("docs.company")
ratios_ttm = catalog.load_table("docs.ratios_ttm")
forecasts = catalog.load_table("docs.forecasts")

# Apply filters, convert dates, and convert to pandas
company_tech = company.scan(row_filter=(EqualTo('Sector', 'Technology')))
company_tech_usd = company_tech.filter(EqualTo('Currency','USD'))
company_tech_use_df = company_tech_usd.to_pandas()
company_df = company.scan().to_pandas()

上記の「フィルターを適用... 」というコメント行は、PyIcebergに関して注目すべき点を示しています。 pyiceberg.expressionsには、いくつかの式タイプがインポート済みで、使用できるようになっています。これらは基本的に、数値と文字列の両方の評価に使用される4つの式です。最初のrow_filterは、インポートした「EqualTo」式を使用して、テクノロジーセクターのみを対象としています。2番目のフィルターは、行をUSD通貨のみに制限します。使用するオブジェクトが異なるため、構文も若干異なります。最初のオブジェクトはスキャンおよびフィルタリングされるテーブルであり、2番目のオブジェクトはフィルタリングされるDataScanオブジェクトです。

合計を計算するパンダ
Pandas calculating totals

次のセクションでは、Pandasの型変換を行い、データフレームをマージして新しいオブジェクトを作成します。上記のpyiceberg.expressionsメソッドを使用したフィルターとは対照的に、 company_dfオブジェクトにPandasフィルターを使用しました。データフレームのすべての列はオブジェクト型であるため、数値計算を行うには、列を数値型(float)に変換する必要があります。また、Icebergの「forecasts」テーブルを再度スキャンして「Dividend_Per_Share_Growth」をフィルターし、出力をPandasに変換します。

# Pandas filters; set date datatypes
company_df = company_df[company_df['Currency'] == 'USD']
company_df['AsOfDate'] = pd.to_datetime(company_df['AsOfDate'], format='%Y-%m-%d').dt.date

ratios_ttm_df = ratios_ttm.scan(row_filter=GreaterThan('Cash_Per_Share', 0.01)).to_pandas()
ratios_ttm_df['AsOfDate'] = pd.to_datetime(ratios_ttm_df['AsOfDate'], format='%Y-%m-%d').dt.date

forecasts_df = forecasts.scan(row_filter=GreaterThanOrEqual('Dividend_Per_Share_Growth', 0.000)).to_pandas()
forecasts_df['AsOfDate'] = pd.to_datetime(forecasts_df['AsOfDate'], format='%Y-%m-%d').dt.date

# Merge tables and clean up duplicates
company_ratios_df = pd.merge(company_df, ratios_ttm_df, how='inner', on='Ticker').drop_duplicates(subset=['Ticker']).reset_index(drop=True)
company_forecast_df = pd.merge(company_df, forecasts_df, how='inner', on='Ticker').drop_duplicates(subset=['Ticker']).reset_index(drop=True)
company_ratios_forecast_df = pd.merge(company_ratios_df, forecasts_df, how='inner', on='Ticker').drop_duplicates(subset=['Ticker']).reset_index(drop=True)

コードの最後のセクションでは、データフレームの「Ticker」列を使用してデータフレームをマージし、同じ列をサブセットとして重複を削除します。インデックスをリセットしないと、以降の操作が機能しません。SQLユーザーにとって、「how='inner'」を使用した共通列のマージは、RDBMSの2つのテーブル間の内部結合と機能的に似ています。これらの操作の結果は、将来的に使用できる拡張データセットを提供します。


次のセクションでは、float の列定義についてさらに詳しく説明するので、このセクションは飛ばして、Pandas の「名前付き集計」に進みます。以下に示すように、インポート時に Pandas のエイリアス「pd」を使用して、名前付き集計が作成されます。

# company + ratios + forecast named aggregates
company_cash_agg = pd.NamedAgg(column='Cash_Per_Share', aggfunc='mean')
company_tangible_agg = pd.NamedAgg(column='Tangible_Book_Value_Per_Share', aggfunc='median')
company_free_cash_agg = pd.NamedAgg(column='Free_Cash_Flow_Per_Share', aggfunc='mean')
company_discounted_agg = pd.NamedAgg(column='Discounted_Cash_Flow', aggfunc='mean')
company_yield_agg = pd.NamedAgg(column='Free_Cash_Flow_Yield_PCT', aggfunc='mean')
company_pe_agg = pd.NamedAgg(column='Price_To_Earnings', aggfunc='median')
company_roe_agg = pd.NamedAgg(column='Return_On_Equity_PCT', aggfunc='median')

各集計の結果は、列を含むデータフレームにgroup byを追加することで計算されます。「mean_10yr_cash」などの名前付き結果は、結果の集計データフレームの列名になります。以下に示すように、単一のデータフレームgroup byから複数の名前付き集計結果を取得できることが分かります。これは基本的に1つのステートメントです。

# Get results for each aggregate
company_cash_all = company_ratios_forecast_df.groupby(['Ticker', 'CompanyName']).agg(mean_10yr_cash=company_cash_agg,                          mean_10yr_free_cash=company_free_cash_agg,
mean_10yr_discounted_cash=company_discounted_agg,
mean_10yr_cash_yield=company_yield_agg)

company_metrics = company_ratios_forecast_df.groupby(['Ticker', 'CompanyName']).agg(median_tangible_bv_share=company_tangible_agg, median_pe=company_pe_agg, median_roe_pct=company_roe_agg)

# Non named aggregates
df_div = forecasts_df.groupby('CalendarYear')['Dividend_Per_Share_Growth'].agg(['mean', 'median', 'quantile'])
df_sga = forecasts_df.groupby('CalendarYear')['Debt_Growth_PCT'].agg(['mean', 'median', 'quantile'])

結果:

会社の現金合計
Company Cash Aggregates
企業指標の集計
Company Metrics Aggregates













1株当たり配当金成長合計
Dividend Per Share Growth
債務増加率合計
Debt Growth PCT














Pandasの結果をグラフ化する

例の集計出力をより分かりやすくするために、 matplotlibseabornを使って結果をグラフ化することにしました。グラフ化することで、出力計算を見ただけでは分からない文脈情報が提供されることがあります。

グラフを作成するパンダ
Pandas creating graphs

以下のコードブロックには、結果データセットがプロットされるコンテキストを示すコメントが含まれています。matplotlibの知識があれば比較的簡単に実行できますが、そうでない場合は構文を調べる必要があるかもしれません。グラフの作成方法を知っている場合は、ライブラリがグラフ作成で使用されている用語を使用しているため、ある程度直感的に理解できます。

# 1. Dividend Growth and Debt Growth Over Time
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))

# Dividend Growth Plot
df_div.plot(ax=ax1, marker='o')
ax1.set_title('Dividend Per Share Growth Over Time')
ax1.set_xlabel('Calendar Year')
ax1.set_ylabel('Growth Rate')
ax1.legend(['Mean', 'Median', 'Quantile'])
ax1.grid(True)

# Debt Growth Plot
df_sga.plot(ax=ax2, marker='o')
ax2.set_title('Debt Growth Percentage Over Time')
ax2.set_xlabel('Calendar Year')
ax2.set_ylabel('Growth Rate (%)')
ax2.legend(['Mean', 'Median', 'Quantile'])
ax2.grid(True)

plt.tight_layout()
plt.show()

# 2. Company Metrics Distribution
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))

# Cash Metrics
sns.boxplot(data=company_cash_all, ax=ax1)
ax1.set_title('Distribution of Cash Metrics')
ax1.tick_params(axis='x', rotation=45)

# Company Performance Metrics
sns.boxplot(data=company_metrics, ax=ax2)
ax2.set_title('Distribution of Company Performance Metrics')
ax2.tick_params(axis='x', rotation=45)

# Top 10 Companies by Free Cash Flow Yield
top_cash_yield = company_cash_all.nlargest(10, 'mean_10yr_cash_yield')
sns.barplot(data=top_cash_yield.reset_index(), x='CompanyName', y='mean_10yr_cash_yield', ax=ax3)
ax3.set_title('Top 10 Companies by Free Cash Flow Yield')
ax3.tick_params(axis='x', rotation=45)

# Top 10 Companies by ROE
top_roe = company_metrics.nlargest(10, 'median_roe_pct')
sns.barplot(data=top_roe.reset_index(), x='CompanyName', y='median_roe_pct', ax=ax4)
ax4.set_title('Top 10 Companies by Return on Equity')
ax4.tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.show()

# 3. Correlation Matrix of Key Metrics; Combine relevant metrics
correlation_metrics = pd.merge(
    company_cash_all.reset_index(),
    company_metrics.reset_index(),
    on=['Ticker', 'CompanyName']
)

correlation_cols = [col for col in correlation_metrics.columns
                   if col not in ['Ticker', 'CompanyName']]
correlation_matrix = correlation_metrics[correlation_cols].corr()

plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0)
plt.title('Correlation Matrix of Company Metrics')
plt.tight_layout()
plt.show()
相関行列

棒グラフ
Bar graphs

折れ線グラフ
Line graphs

PandasとApache Icebergを使った分析のユースケースの可能性について、少しでもご理解いただけたかと思います。近い将来、Apache SparkとIcebergを組み合わせた、他のバリエーションも検討していく予定です。まずは、Icebergからソースデータを読み込み、Sparkで集計と計算を行い、その結果をIcebergに書き戻すという手法です。お読みいただきありがとうございました。

手を振って別れを告げるパンダ
Pandas waving bye


bottom of page