top of page

Apache Iceberg、Hadoop、Hive: データレイク (Lakehouse) を開く -> パート II

この記事では、Hiveメタデータへのユーザーアクセスと、結果セットの作成に使用されるメカニズムについて説明します。データレイクやレイクハウスのデータをユーザーに公開する方法を説明できれば幸いです。
Spark データフレームと Hive
Spark Dataframes and Hive

始める前に注意点を一つ。前回の投稿では、挿入の例とHiveデータベースへのクエリを紹介しました。リソース不足のため、ローカルのHadoopとHiveのインストールでパフォーマンスチューニングを行う必要があったことをお伝えすべきでした。


ローカルでの書き込みと読み込みが特に遅かったので、いくつか変更を加えました。パフォーマンス向上のために行った変更点の大部分は以下のとおりです。注目すべきパラメータについては、行にコメントを付けてあります。




コアサイト.xml

<property>
	<name>io.file.buffer.size</name>
	<value>524288</value>
</property>
<property>
	<name>fs.inmemory.size.mb</name>
	<value>350</value>
</property>
<property>
<property>
	<name>iceberg.engine.hive.enabled</name>
	<value>true</value>
</property>

hdfs-site.xml

<property>
      <name>dfs.block.size</name>
      <value>134217728</value>  # Increased from default
  </property>
  <property>
      <name>dfs.namenode.handler.count</name>
      <value>40</value> # Increased from default
  </property>
  <property>
      <name>dfs.namenode.avoid.read.slow.datanode</name>
      <value>true</value> # Added
  </property>

mapred-site.xml

 <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
 </property>

 <property>
 	<name>mapreduce.map.output.compress.codec</name>
	<value>org.apache.hadoop.io.compress.SnappyCodec</value>  # Added enables parallel compression
 </property>
 <property>
      <name>mapred.map.child.java.opts</name>
      <value>-Xmx512M</value>  # Reduced from 1GB default
  </property>

 <property>
     <name>mapred.reduce.child.java.opts</name>
     <value>-Xmx512M</value>. # Reduced from 1GB default
 </property>
 <property>
     <name>mapreduce.map.cpu.vcores</name>
     <value>2</value>  # Larger, Default is 1
 </property>
 <property>
     <name>yarn.app.mapreduce.am.command-opts</name>
     <value>-Xmx3768m</value>
 </property>

糸サイト.xml

<property>
    <name>yarn.nodemanager.resource.cpu-vcores</name>
    <value>2</value> # New - matches the mapred allocation
</property>
 
<property>
    <name>yarn.resourcemanager.hostname</name>
    <value>localhost</value> # New, I have two ethernet interfaces, so avoiding wireless IP
</property>
    
<property>
    <name>yarn.scheduler.minimum-allocation-mb</name>
    <value>512</value> # New, allocate fixed size for scheduler
</property>
<property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>true</value> # New, check physical memory available
</property>
<property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>true</value> # New, check virtual memory available
</property>
<property>
    <name>yarn.nodemanager.disk-health-checker.min-healthy-disks</name>
    <value>0.0</value> # Disk safety check
</property>
<property>
    <name>yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage</name>
    <value>100.0</value> # Disk safety check
</property>
   
 <property>
     <name>yarn.nodemanager.resource.memory-mb</name>
     <value>16392</value> # Increased from default
     <description>Physical memory, in MB, to be made available to running containers</description>
 </property>

前回の投稿から引き続き、データストアがデータにアクセスするために使用する、比較的一般的なインターフェースから始めました。Hiveデータベースとその基盤となるデータレイクへのシンプルなJDBC接続を、 jdbc:hive2:// localhost:10001/defaultとして設定しました


次に、Spark経由でロードしたデータに対してクエリを実行しました。今回もJetBrains DataGripを使用しました。Hiveスキーマは以下のようになっていました。

Hive データベーススキーマ
Hive Database Schema

テーブルは株式関連であり、データフレームを使用して「company_stocks」と「forecasts」データセットを結合した Apache Spark ジョブを使用して「company_forecasts_2024」テーブルを作成しました。


少し異なるのは、「予測」を構成するデータファイルが10個あることです。使用したPySparkスクリプトは以下の通りです。以前のようにハードコーディングするのではなく、より「ファクトリー」なアプローチでスキーマをHiveテーブルと同期させるようにしました。


基本的なフロー(下図)は、予測用の10個のファイルを読み込み、データフレームを作成します。次に、Icebergのプロパティをストレージに保存したかったため、Hiveデータベース内の現在の「予測」テーブル定義を読み取ります。


次に、Sparkファイル読み取りによって作成されたデータフレームを既存のHiveテーブルと比較しました。データフレームメソッドを使用してデータを保存するには、Hiveテーブルに合わせて型と列の順序を変更する必要がありました。その後、ファイル読み取りデータをHiveのテーブルに追加しました。

Sparkで結合されたデータフレーム
Joined Dataframes in Spark

次のステップでは、更新された「forecasts」と「company_stocks」の両方をHiveから読み込み、「inner join」フィルタリングを実行して2024年のデータのみを取得します。次に、各テーブルの特定の要素を新しいデータフレームにマージします。最後のステップは、そのデータフレームをHiveの新しいテーブルに書き込むことです --> 合計時間は約5秒でした。コードと設定の変更 spark-defaults.co nf ; conf ファイル内に行の折り返しがあります。

PySpark スクリプト

from datetime import datetime
from pyspark.sql.types import DoubleType, DateType, BooleanType, IntegerType, StringType, TimestampType
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import hive_metastore_client as hme

spark = (SparkSession
         .builder
         .appName("Python Spark SQL Hive integration example")
		 .master("local[*]")
         .config("spark.jars", "/Volumes/ExtShield/opt/spark-3.5.6-bin-hadoop3/jars/iceberg-spark-runtime-3.5_2.12-1.9.1.jar")
         .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1")
         .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
         .config("spark.sql.catalog.local.type", "hive")
         .config("spark.sql.catalog.local.warehouse", "hdfs://localhost:9000/user/hive/warehouse")
         .config("spark.sql.defaultCatalog", "local")
         .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
		 .config("spark.sql.catalogImplementation", "hive")
         .config("spark.hadoop.hive.metastore.uris", "thrift://localhost:9083")
         .config("spark.sql.warehouse.dir", "hdfs://localhost:9000/user/hive/warehouse")
         .config("spark.sql.warehouse.external.enabled", "true")
         .enableHiveSupport()
         .getOrCreate())

df = spark.read.csv('hdfs://localhost:9000/input_files/forecast-*.txt',
                                   header=True, inferSchema=True, sep='|')
# spark is an existing SparkSession
hive_jdbc = {
	"url": "jdbc:hive2://localhost:10001/default",
	"driver": "org.apache.hive.jdbc.HiveDriver",
	"table": "forecasts"
}

existing_df = spark.read.jdbc(url=hive_jdbc.get("url"), table=hive_jdbc.get("table"))
target_schema = existing_df.schema.names
target_columns = existing_df.schema.fieldNames()
target_columns = [c.replace("forecasts.", "") for c in target_columns]
target_table = hive_jdbc.get("table")

# Create a mapping between upper and lower case column names
column_mapping = {col_name: col_name for col_name in target_columns}

# First rename the columns to match case
for old_col in df.columns:
    if old_col.lower() in column_mapping:
        print(f"{old_col}, {old_col.lower()}")
        df = df.withColumnRenamed(old_col, column_mapping[old_col.lower()])

print(df.printSchema())
# Reorder columns to match existing_df
df = df.select(target_columns)
print(df.count())

forecast_df = spark.read.table("default.forecasts")
company_stocks_df = spark.read.table("default.company_stocks")

jn_df = forecast_df.join(company_stocks_df, [forecast_df.ticker == company_stocks_df.stocksymbol, forecast_df.calendaryear==2024], how="inner")
selected_df = jn_df.select("ticker", "calendaryear", "exchange", "sector", "isin", "debt_growth_pct",
                           "dividend_per_share_growth", "ebitda_growth_pct", "eps_diluted_growth_pct", "eps_growth_pct", "cash_per_share", "revenue_growth_pct", "net_income_growth_pct")

selected_df.write.saveAsTable("default.company_forecasts_2024", format="iceberg", mode="overwrite")

print(jn_df.count())
print(jn_df.show())

spark-defaults.conf

spark.jars.packages                                  	org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1
spark.sql.extensions                                 		org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.hive.metastore.uri			     		thrift://localhost:9083
spark.sql.warehouse.dir				     		hdfs://ilocalhost:9000/user/hive/warehouse
spark.sql.catalogImplementation			     		hive
spark.sql.catalog.spark_catalog					org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.spark_catalog.type				hive
spark.sql.catalog.spark_catalog.warehouse		       hdfs://localhost:9000/user/spark/warehouse
spark.sql.catalog.local 						org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.local.type 					hive
spark.sql.catalog.local.warehouse 				/Volumes/ExtShield/opt/local_catalog

spark.sql.hive.metastore.version		     			2.3.10
spark.sql.hive.metastore.jars			     		builtin
spark.sql.hive.metastore.jars.path		     		$HIVE_HOME/lib/*
spark.sql.hive.metastore.sharedPrefixes		     		org.postgresql
spark.sql.hive.hiveserver2.jdbc.url		     		jdbc:hive2://localhost:10001/default

データレイクのデータアクセス

サードパーティツール経由でデータをクエリするために、まずDataGripを使ってHiveへの基本的なJDBC接続を確立し、上記のスキーマのスクリーンショットを作成しました。データロード後、ISIN値に基づいてデータのサブセットを取得するビューを作成しました。基本的なクエリは以下と同じですが、ISIN値の先頭の文字がワイルドカードマッチによってISO2国コードと一致する点が異なります。

select cs.asofdate, cs.stocksymbol, cs.isin, cs.companyname, cs.country, fc.estimatedate, fc.debt_growth_pct, fc.dividend_per_share_growth,
       fc.eps_diluted_growth_pct, fc.five_year_net_income_growth_per_share, fc.five_year_dividend_per_share_growth, fc.five_year_operating_cashflow_growth_per_share,
       fc.five_year_revenue_growth_per_share, fc.five_year_shareholders_equity_growth_per_share
from company_stocks cs left join forecasts fc on (cs.stocksymbol = fc.ticker and fc.calendaryear = '2024') where cs.isin like 'US%';

その後、ISIN値でフィルタリングする複数のビューを作成し、そのうちの1つを使って、結合SQL文の実行とビューからの単純なSELECT文の実行を比較しました。以下は結果ですが、簡単に言うとほぼ同じです。どちらもHadoop経由のMapReduceを使用しており、実行時間もほぼ同じです。


Hive から JDBC で SELECT する

Joinを使用したSELECT

select cs.asofdate, cs.stocksymbol, cs.isin, cs.companyname, cs.country, fc.estimatedate, fc.debt_growth_pct, fc.dividend_per_share_growth,  fc.eps_diluted_growth_pct, fc.five_year_net_income_growth_per_share, fc.five_year_dividend_per_share_growth, fc.five_year_operating_cashflow_growth_per_share, fc.five_year_revenue_growth_per_share, fc.five_year_shareholders_equity_growth_per_share from company_stocks cs left join forecasts fc on (cs.stocksymbol = fc.ticker and fc.calendaryear = '2024') where cs.isin like 'US%'  [2025-06-20 18:07:59] 500 rows retrieved starting from 1 in 17 s 530 ms (execution: 17 s 262 ms, fetching: 268 ms) -------------------------------------------------------------------------------------------------- Query ID = claude_paugh_20250620180742_2bf50365-811c-4e27-b61e-7d7181339c9c Total jobs = 2 Stage-5 is filtered out by condition resolver. Stage-1 is selected by condition resolver. Launching Job 1 out of 2 Number of reduce tasks not specified. Estimated from input data size: 1 In order to change the average load for a reducer (in bytes):   set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers:   set hive.exec.reducers.max=<number> In order to set a constant number of reducers:   set mapreduce.job.reduces=<number> Starting Job = job_1750435886619_0016, Tracking URL = http://localhost:8088/proxy/application_1750435886619_0016/ Kill Command = /opt/homebrew/Cellar/hadoop/3.4.1/libexec/bin/mapred job  -kill job_1750435886619_0016 Hadoop job information for Stage-1: 
number of mappers: 2; number of reducers: 1 
2025-06-20 18:07:47,945 Stage-1 map = 0%,  reduce = 0% 
2025-06-20 18:07:53,120 Stage-1 map = 50%,  reduce = 0% 
2025-06-20 18:07:57,237 Stage-1 map = 100%,  reduce = 0% 
2025-06-20 18:07:58,268 Stage-1 map = 100%,  reduce = 100% 
Ended Job = job_1750435886619_0016 
MapReduce Jobs Launched:  Stage-Stage-1: Map: 2  Reduce: 1   
HDFS Read: 22020529 HDFS Write: 1557723 HDFS EC Read: 0 SUCCESS Total MapReduce CPU Time Spent: 0 msec 

ビューを使用したSELECT

default> select * from default.vw_forecasts_us_2024
2025-06-20 18:12:49] 500 rows retrieved starting from 1 in 18 s 546 ms (execution: 18 s 281 ms, fetching: 265 ms)
----------------------------------------------------------------------------------------------------
Query ID = claude_paugh_20250620181230_1794bd84-e171-48ed-9915-a95ea5717a21
Total jobs = 2
Stage-5 is filtered out by condition resolver.
Stage-1 is selected by condition resolver.
Launching Job 1 out of 2
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1750435886619_0018, Tracking URL = http://localhost:8088/proxy/application_1750435886619_0018/
Kill Command = /opt/homebrew/Cellar/hadoop/3.4.1/libexec/bin/mapred job  -kill job_1750435886619_0018
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1
2025-06-20 18:12:36,547 Stage-1 map = 0%,  reduce = 0%
2025-06-20 18:12:42,767 Stage-1 map = 50%,  reduce = 0%
2025-06-20 18:12:46,875 Stage-1 map = 100%,  reduce = 0%
2025-06-20 18:12:47,913 Stage-1 map = 100%,  reduce = 100%
Ended Job = job_1750435886619_0018
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 2  Reduce: 1   HDFS Read: 22021017 HDFS Write: 1557723 HDFS EC Read: 0 SUCCESS
Total MapReduce CPU Time Spent: 0 msec

[

ビューに対して他のいくつかの単純な結合シナリオをテストしましたが、結果は同様でした。私の最初の結論は、他のデータのベーステーブルを必要としない限り、これらのフィルターや集計シナリオの一部は、Apache Sparkデータフレーム結合を使用する方がパフォーマンスが向上するということです。


驚くことではありませんが、ユーザーが他のユースケースでこれらのベーステーブルを必要とする場合、Hadoop で処理させるのは現実的な選択肢です。ただし、実行時間の違いは覚悟しておいてください。Hadoop インフラストラクチャを完全に最適化していないため、MapReduce の実行時間も改善できた可能性があります。


この記事は時間的な制約があったため、 Apache DrillApache Supersetを使ってユーザーエクスペリエンスを向上させる機会がありませんでした。しかし、どちらもHive接続が利用できるので、データにアクセスできるはずです。そこで、時間をかけて比較検討できるDrillとSupersetの対決に挑戦してみようと思います。


私が示した例はHiveのみに焦点を当てていますが、Drillなどの追加ツールを使用してHDFSファイルシステムとファイルコンテンツにアクセスすることもできます。つまり、JDBCとHiveは構造化データにアクセスしているということです。コネクタを備えたクエリエンジンも多数存在するため、HDFSファイルシステムのクエリ、JSON、XML、区切りファイルの解析、JDBC接続の使用が可能です。Prestoは一般的なツールであり、AWSのGlueは様々なデータ形式を「オープン」にすることができます。


最後にもう一つ。データレイクとレイクハウスのモデルは、データウェアハウスで長年使用されてきた、データのランディング、スクラビング、変換、リリースのためのデータ品質管理モデルを緩く採用しています。このワークフローは、レイクハウスで求められる様々なレベルのデータ品質を実現することを目的としています。データレイクでは、ユーザーにデータを提供するために、ランディングと「ライト」スクラビングが最も一般的に使用されています。

bottom of page