top of page

Apache Iceberg、Hadoop 和 Hive:打开你的数据湖(Lakehouse)-> 第二部分

在本文中,我将演示用户如何访问 Hive 元数据,以及用于创建结果集的机制。我希望能够演示如何为用户开放 DataLake 或 LakeHouse 数据。
Spark Dataframes 和 Hive
Spark Dataframes and Hive

在开始之前,我需要先回顾一下之前的说明。我在上一篇文章中提供了一个插入示例,以及一个在我的 Hive 数据库上的查询。我应该指出,由于资源不足,我确实需要对本地 Hadoop 和 Hive 安装进行一些性能调优。


本地写入和读取速度特别慢,所以我做了一些修改。以下是我为提升性能所做的大部分修改。我已经在行中对一些值得注意的参数进行了注释。




核心站点.xml

<property>
	<name>io.file.buffer.size</name>
	<value>524288</value>
</property>
<property>
	<name>fs.inmemory.size.mb</name>
	<value>350</value>
</property>
<property>
<property>
	<name>iceberg.engine.hive.enabled</name>
	<value>true</value>
</property>

hdfs-site.xml

<property>
      <name>dfs.block.size</name>
      <value>134217728</value>  # Increased from default
  </property>
  <property>
      <name>dfs.namenode.handler.count</name>
      <value>40</value> # Increased from default
  </property>
  <property>
      <name>dfs.namenode.avoid.read.slow.datanode</name>
      <value>true</value> # Added
  </property>

mapred-site.xml

 <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
 </property>

 <property>
 	<name>mapreduce.map.output.compress.codec</name>
	<value>org.apache.hadoop.io.compress.SnappyCodec</value>  # Added enables parallel compression
 </property>
 <property>
      <name>mapred.map.child.java.opts</name>
      <value>-Xmx512M</value>  # Reduced from 1GB default
  </property>

 <property>
     <name>mapred.reduce.child.java.opts</name>
     <value>-Xmx512M</value>. # Reduced from 1GB default
 </property>
 <property>
     <name>mapreduce.map.cpu.vcores</name>
     <value>2</value>  # Larger, Default is 1
 </property>
 <property>
     <name>yarn.app.mapreduce.am.command-opts</name>
     <value>-Xmx3768m</value>
 </property>

纱线-site.xml

<property>
    <name>yarn.nodemanager.resource.cpu-vcores</name>
    <value>2</value> # New - matches the mapred allocation
</property>
 
<property>
    <name>yarn.resourcemanager.hostname</name>
    <value>localhost</value> # New, I have two ethernet interfaces, so avoiding wireless IP
</property>
    
<property>
    <name>yarn.scheduler.minimum-allocation-mb</name>
    <value>512</value> # New, allocate fixed size for scheduler
</property>
<property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>true</value> # New, check physical memory available
</property>
<property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>true</value> # New, check virtual memory available
</property>
<property>
    <name>yarn.nodemanager.disk-health-checker.min-healthy-disks</name>
    <value>0.0</value> # Disk safety check
</property>
<property>
    <name>yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage</name>
    <value>100.0</value> # Disk safety check
</property>
   
 <property>
     <name>yarn.nodemanager.resource.memory-mb</name>
     <value>16392</value> # Increased from default
     <description>Physical memory, in MB, to be made available to running containers</description>
 </property>

延续上一篇文章,我从一个数据存储访问数据时使用的相当常见的接口开始。我建立了一个简单的 JDBC 连接,连接到我的 Hive 数据库和底层数据湖,如下所示: jdbc:hive2:// localhost:10001/default


然后,我开始查询之前通过 Spark 加载的数据。我再次使用了 JetBrains DataGrip。我的 Hive 模式如下所示:

Hive 数据库架构
Hive Database Schema

这些表与股票相关,我使用 Apache Spark 作业创建了“company_forecasts_2024”表,该作业使用数据框连接了我的“company_stocks”和“forecasts”数据集。


略有不同,因为我有 10 个数据文件组成“预测”。我使用的 PySpark 脚本如下。我采用了一种更“工厂化”的方法来同步架构以匹配 Hive 表,而不是像以前那样进行硬编码。


基本流程(如下图所示)是读取 10 个预测文件,创建一个数据框。然后读取 Hive 数据库中当前的“预测”表定义,因为我想保留 Iceberg 属性进行存储。


接下来,我将 Spark 文件读取创建的数据框与现有的 Hive 表进行对比。我需要更改类型和列顺序,使其与 Hive 表匹配,以便通过数据框方法保存数据。然后,我将文件读取的数据附加到 Hive 表中。

Spark 中的连接数据框
Joined Dataframes in Spark

下一步,我从 Hive 读取更新后的“预测”和“公司股票”,并执行“内连接”筛选,仅筛选 2024 条数据。接下来,将每个表中的特定元素合并到一个新的 DataFrame 中。最后一步是将该 DataFrame 写入 Hive 中的新表 --> 总共耗时约 5 秒。代码和配置有所变化 spark-defaults.co nf ;conf 文件中有一些换行符。

PySpark脚本

from datetime import datetime
from pyspark.sql.types import DoubleType, DateType, BooleanType, IntegerType, StringType, TimestampType
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import hive_metastore_client as hme

spark = (SparkSession
         .builder
         .appName("Python Spark SQL Hive integration example")
		 .master("local[*]")
         .config("spark.jars", "/Volumes/ExtShield/opt/spark-3.5.6-bin-hadoop3/jars/iceberg-spark-runtime-3.5_2.12-1.9.1.jar")
         .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1")
         .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
         .config("spark.sql.catalog.local.type", "hive")
         .config("spark.sql.catalog.local.warehouse", "hdfs://localhost:9000/user/hive/warehouse")
         .config("spark.sql.defaultCatalog", "local")
         .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
		 .config("spark.sql.catalogImplementation", "hive")
         .config("spark.hadoop.hive.metastore.uris", "thrift://localhost:9083")
         .config("spark.sql.warehouse.dir", "hdfs://localhost:9000/user/hive/warehouse")
         .config("spark.sql.warehouse.external.enabled", "true")
         .enableHiveSupport()
         .getOrCreate())

df = spark.read.csv('hdfs://localhost:9000/input_files/forecast-*.txt',
                                   header=True, inferSchema=True, sep='|')
# spark is an existing SparkSession
hive_jdbc = {
	"url": "jdbc:hive2://localhost:10001/default",
	"driver": "org.apache.hive.jdbc.HiveDriver",
	"table": "forecasts"
}

existing_df = spark.read.jdbc(url=hive_jdbc.get("url"), table=hive_jdbc.get("table"))
target_schema = existing_df.schema.names
target_columns = existing_df.schema.fieldNames()
target_columns = [c.replace("forecasts.", "") for c in target_columns]
target_table = hive_jdbc.get("table")

# Create a mapping between upper and lower case column names
column_mapping = {col_name: col_name for col_name in target_columns}

# First rename the columns to match case
for old_col in df.columns:
    if old_col.lower() in column_mapping:
        print(f"{old_col}, {old_col.lower()}")
        df = df.withColumnRenamed(old_col, column_mapping[old_col.lower()])

print(df.printSchema())
# Reorder columns to match existing_df
df = df.select(target_columns)
print(df.count())

forecast_df = spark.read.table("default.forecasts")
company_stocks_df = spark.read.table("default.company_stocks")

jn_df = forecast_df.join(company_stocks_df, [forecast_df.ticker == company_stocks_df.stocksymbol, forecast_df.calendaryear==2024], how="inner")
selected_df = jn_df.select("ticker", "calendaryear", "exchange", "sector", "isin", "debt_growth_pct",
                           "dividend_per_share_growth", "ebitda_growth_pct", "eps_diluted_growth_pct", "eps_growth_pct", "cash_per_share", "revenue_growth_pct", "net_income_growth_pct")

selected_df.write.saveAsTable("default.company_forecasts_2024", format="iceberg", mode="overwrite")

print(jn_df.count())
print(jn_df.show())

spark-defaults.conf

spark.jars.packages                                  	org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1
spark.sql.extensions                                 		org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.hive.metastore.uri			     		thrift://localhost:9083
spark.sql.warehouse.dir				     		hdfs://ilocalhost:9000/user/hive/warehouse
spark.sql.catalogImplementation			     		hive
spark.sql.catalog.spark_catalog					org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.spark_catalog.type				hive
spark.sql.catalog.spark_catalog.warehouse		       hdfs://localhost:9000/user/spark/warehouse
spark.sql.catalog.local 						org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.local.type 					hive
spark.sql.catalog.local.warehouse 				/Volumes/ExtShield/opt/local_catalog

spark.sql.hive.metastore.version		     			2.3.10
spark.sql.hive.metastore.jars			     		builtin
spark.sql.hive.metastore.jars.path		     		$HIVE_HOME/lib/*
spark.sql.hive.metastore.sharedPrefixes		     		org.postgresql
spark.sql.hive.hiveserver2.jdbc.url		     		jdbc:hive2://localhost:10001/default

Datalake 的数据访问

为了通过第三方工具查询数据,我首先使用 DataGrip 建立了与 Hive 的基本 JDBC 连接,并生成了上述架构的屏幕截图。数据加载完成后,我创建了视图,用于根据 ISIN 值检索数据子集。基本查询与此相同,如下所示,不同之处在于 ISIN 值的前导字符与 ISO2 国家代码匹配,并使用了通配符。

select cs.asofdate, cs.stocksymbol, cs.isin, cs.companyname, cs.country, fc.estimatedate, fc.debt_growth_pct, fc.dividend_per_share_growth,
       fc.eps_diluted_growth_pct, fc.five_year_net_income_growth_per_share, fc.five_year_dividend_per_share_growth, fc.five_year_operating_cashflow_growth_per_share,
       fc.five_year_revenue_growth_per_share, fc.five_year_shareholders_equity_growth_per_share
from company_stocks cs left join forecasts fc on (cs.stocksymbol = fc.ticker and fc.calendaryear = '2024') where cs.isin like 'US%';

随后,我创建了不同的视图,根据 ISIN 值进行过滤,并用其中一个视图比较了SQL 连接语句和视图中简单查询语句的执行情况。结果如下,简而言之,它们几乎相同。两者都通过 Hadoop 使用 MapReduce,并且花费的时间大致相同。


使用 JDBC 从 Hive 中进行选择

使用 Join 进行 SELECT

select cs.asofdate, cs.stocksymbol, cs.isin, cs.companyname, cs.country, fc.estimatedate, fc.debt_growth_pct, fc.dividend_per_share_growth,  fc.eps_diluted_growth_pct, fc.five_year_net_income_growth_per_share, fc.five_year_dividend_per_share_growth, fc.five_year_operating_cashflow_growth_per_share, fc.five_year_revenue_growth_per_share, fc.five_year_shareholders_equity_growth_per_share from company_stocks cs left join forecasts fc on (cs.stocksymbol = fc.ticker and fc.calendaryear = '2024') where cs.isin like 'US%'  [2025-06-20 18:07:59] 500 rows retrieved starting from 1 in 17 s 530 ms (execution: 17 s 262 ms, fetching: 268 ms) -------------------------------------------------------------------------------------------------- Query ID = claude_paugh_20250620180742_2bf50365-811c-4e27-b61e-7d7181339c9c Total jobs = 2 Stage-5 is filtered out by condition resolver. Stage-1 is selected by condition resolver. Launching Job 1 out of 2 Number of reduce tasks not specified. Estimated from input data size: 1 In order to change the average load for a reducer (in bytes):   set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers:   set hive.exec.reducers.max=<number> In order to set a constant number of reducers:   set mapreduce.job.reduces=<number> Starting Job = job_1750435886619_0016, Tracking URL = http://localhost:8088/proxy/application_1750435886619_0016/ Kill Command = /opt/homebrew/Cellar/hadoop/3.4.1/libexec/bin/mapred job  -kill job_1750435886619_0016 Hadoop job information for Stage-1: 
number of mappers: 2; number of reducers: 1 
2025-06-20 18:07:47,945 Stage-1 map = 0%,  reduce = 0% 
2025-06-20 18:07:53,120 Stage-1 map = 50%,  reduce = 0% 
2025-06-20 18:07:57,237 Stage-1 map = 100%,  reduce = 0% 
2025-06-20 18:07:58,268 Stage-1 map = 100%,  reduce = 100% 
Ended Job = job_1750435886619_0016 
MapReduce Jobs Launched:  Stage-Stage-1: Map: 2  Reduce: 1   
HDFS Read: 22020529 HDFS Write: 1557723 HDFS EC Read: 0 SUCCESS Total MapReduce CPU Time Spent: 0 msec 

使用视图进行选择

default> select * from default.vw_forecasts_us_2024
2025-06-20 18:12:49] 500 rows retrieved starting from 1 in 18 s 546 ms (execution: 18 s 281 ms, fetching: 265 ms)
----------------------------------------------------------------------------------------------------
Query ID = claude_paugh_20250620181230_1794bd84-e171-48ed-9915-a95ea5717a21
Total jobs = 2
Stage-5 is filtered out by condition resolver.
Stage-1 is selected by condition resolver.
Launching Job 1 out of 2
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1750435886619_0018, Tracking URL = http://localhost:8088/proxy/application_1750435886619_0018/
Kill Command = /opt/homebrew/Cellar/hadoop/3.4.1/libexec/bin/mapred job  -kill job_1750435886619_0018
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1
2025-06-20 18:12:36,547 Stage-1 map = 0%,  reduce = 0%
2025-06-20 18:12:42,767 Stage-1 map = 50%,  reduce = 0%
2025-06-20 18:12:46,875 Stage-1 map = 100%,  reduce = 0%
2025-06-20 18:12:47,913 Stage-1 map = 100%,  reduce = 100%
Ended Job = job_1750435886619_0018
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 2  Reduce: 1   HDFS Read: 22021017 HDFS Write: 1557723 HDFS EC Read: 0 SUCCESS
Total MapReduce CPU Time Spent: 0 msec

[

我针对视图测试了其他几个简单的连接场景,结果也类似。我最初的结论是,除非你需要基表来存储其他数据,否则某些筛选和/或聚合场景使用 Apache Spark DataFrame 连接会获得更好的性能。


这并不奇怪,但如果用户需要将这些基表用于其他用例,那么让 Hadoop 处理它们是一个可行的选择,只是执行速度会有所不同。我还没有完全优化 Hadoop 基础架构,所以 MapReduce 的执行时间可能也需要改进。


由于时间有限,为了写这篇文章,我暂时没时间体验一下Apache DrillApache Superset ,虽然它们都能提供 Hive 连接,但应该可以访问数据。我打算试试 Drill 和 Superset 的对决,这样可以腾出更多时间来比较。


我提供的示例主要针对 Hive,但您也可以使用 Drill 等其他工具来访问 HDFS 文件系统和文件内容,因此 JDBC 和 Hive 会处理您拥有的结构化数据。许多类似的查询引擎也带有连接器,因此您可以查询 HDFS 文件系统,解析 JSON、XML 和分隔文件,以及使用 JDBC 连接。Presto 是一种常见的查询引擎,AWS 的 Glue 也可以“打开”不同的数据格式。


最后一点:Datalake 和 Lakehouse 模型大致上采用了一种数据质量控制模型,用于数据仓库中多年来一直沿用的数据登陆、清理、转换和发布。该工作流程实际上旨在确保 Lakehouse 通常所需的多个不同级别的数据质量。Datalake 最常使用登陆和“精简”清理来将数据呈现给用户。

bottom of page