Apache Iceberg, Hadoop y Hive: Abra su Datalake (Lakehouse) -> Parte II
- Claude Paugh
- 24 jun
- 7 Min. de lectura
En este artículo, demostraré el acceso de los usuarios a los metadatos de Hive y los mecanismos utilizados para crear conjuntos de resultados. Espero poder demostrar cómo se pueden abrir los datos de datalakes o lakehouses a los usuarios.

Antes de empezar, necesito hacer una advertencia. En mi publicación anterior, proporcioné un ejemplo de inserción y una consulta en mi base de datos de Hive. Debería haber mencionado que, debido a la falta de recursos, tuve que ajustar el rendimiento de mi instalación local de Hadoop y Hive.
La escritura y la lectura eran especialmente lentas localmente, así que realicé algunos cambios. A continuación, se muestran la mayoría de los cambios que realicé para mejorar el rendimiento. He comentado los parámetros importantes en las líneas.
sitio principal.xml
<property>
<name>io.file.buffer.size</name>
<value>524288</value>
</property>
<property>
<name>fs.inmemory.size.mb</name>
<value>350</value>
</property>
<property>
<property>
<name>iceberg.engine.hive.enabled</name>
<value>true</value>
</property>
hdfs-site.xml
<property>
<name>dfs.block.size</name>
<value>134217728</value> # Increased from default
</property>
<property>
<name>dfs.namenode.handler.count</name>
<value>40</value> # Increased from default
</property>
<property>
<name>dfs.namenode.avoid.read.slow.datanode</name>
<value>true</value> # Added
</property>
sitio mapred.xml
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.map.output.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value> # Added enables parallel compression
</property>
<property>
<name>mapred.map.child.java.opts</name>
<value>-Xmx512M</value> # Reduced from 1GB default
</property>
<property>
<name>mapred.reduce.child.java.opts</name>
<value>-Xmx512M</value>. # Reduced from 1GB default
</property>
<property>
<name>mapreduce.map.cpu.vcores</name>
<value>2</value> # Larger, Default is 1
</property>
<property>
<name>yarn.app.mapreduce.am.command-opts</name>
<value>-Xmx3768m</value>
</property>
sitio-de-hilados.xml
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>2</value> # New - matches the mapred allocation
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>localhost</value> # New, I have two ethernet interfaces, so avoiding wireless IP
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>512</value> # New, allocate fixed size for scheduler
</property>
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>true</value> # New, check physical memory available
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>true</value> # New, check virtual memory available
</property>
<property>
<name>yarn.nodemanager.disk-health-checker.min-healthy-disks</name>
<value>0.0</value> # Disk safety check
</property>
<property>
<name>yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage</name>
<value>100.0</value> # Disk safety check
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>16392</value> # Increased from default
<description>Physical memory, in MB, to be made available to running containers</description>
</property>
Partiendo de la última publicación , comencé con una interfaz bastante común que utilizan los almacenes de datos para acceder a sus datos. Configuré una conexión JDBC simple a mi base de datos Hive y al datalake subyacente, como: jdbc:hive2:// localhost:10001/default .
Luego, consulté los datos que había cargado previamente mediante Spark. Estaba usando JetBrains DataGrip de nuevo. Mi esquema de Hive se veía así:

Las tablas están relacionadas con acciones y creé la tabla "company_forecasts_2024" con un trabajo de Apache Spark que unió mis conjuntos de datos "company_stocks" y "forecasts" usando marcos de datos.
Es ligeramente diferente, ya que tengo 10 archivos de datos que conforman los pronósticos. El script de PySpark que usé se muestra a continuación. Adopté un enfoque más "de fábrica" para sincronizar el esquema y que coincida con las tablas de Hive, en lugar de codificarlo como antes.
El flujo básico (diagrama a continuación) consiste en leer los 10 archivos para pronósticos, crear un marco de datos y, a continuación, leer la definición actual de la tabla "pronósticos" en la base de datos de Hive, ya que quería conservar las propiedades de Iceberg para su almacenamiento.
A continuación, comparé el marco de datos creado por la lectura del archivo Spark con la tabla de Hive existente. Necesitaba cambiar los tipos y el orden de las columnas para que coincidieran con la tabla de Hive y guardar los datos mediante un método de marco de datos. Después, añadí los datos leídos a la tabla en Hive.

En el siguiente paso, leí los "pronósticos" y "stocks_de_la_empresa" actualizados de Hive y realicé un filtrado de "combinación interna" para solo 2024 datos. A continuación, fusioné elementos específicos de cada tabla en un nuevo marco de datos. El paso final fue escribir ese marco de datos en una nueva tabla en Hive; el tiempo total fue de aproximadamente 5 segundos. El código y la configuración cambian. spark-defaults.co nf ; hay algún ajuste de línea en el archivo de configuración.
Script de PySpark
from datetime import datetime
from pyspark.sql.types import DoubleType, DateType, BooleanType, IntegerType, StringType, TimestampType
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import hive_metastore_client as hme
spark = (SparkSession
.builder
.appName("Python Spark SQL Hive integration example")
.master("local[*]")
.config("spark.jars", "/Volumes/ExtShield/opt/spark-3.5.6-bin-hadoop3/jars/iceberg-spark-runtime-3.5_2.12-1.9.1.jar")
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1")
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.local.type", "hive")
.config("spark.sql.catalog.local.warehouse", "hdfs://localhost:9000/user/hive/warehouse")
.config("spark.sql.defaultCatalog", "local")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalogImplementation", "hive")
.config("spark.hadoop.hive.metastore.uris", "thrift://localhost:9083")
.config("spark.sql.warehouse.dir", "hdfs://localhost:9000/user/hive/warehouse")
.config("spark.sql.warehouse.external.enabled", "true")
.enableHiveSupport()
.getOrCreate())
df = spark.read.csv('hdfs://localhost:9000/input_files/forecast-*.txt',
header=True, inferSchema=True, sep='|')
# spark is an existing SparkSession
hive_jdbc = {
"url": "jdbc:hive2://localhost:10001/default",
"driver": "org.apache.hive.jdbc.HiveDriver",
"table": "forecasts"
}
existing_df = spark.read.jdbc(url=hive_jdbc.get("url"), table=hive_jdbc.get("table"))
target_schema = existing_df.schema.names
target_columns = existing_df.schema.fieldNames()
target_columns = [c.replace("forecasts.", "") for c in target_columns]
target_table = hive_jdbc.get("table")
# Create a mapping between upper and lower case column names
column_mapping = {col_name: col_name for col_name in target_columns}
# First rename the columns to match case
for old_col in df.columns:
if old_col.lower() in column_mapping:
print(f"{old_col}, {old_col.lower()}")
df = df.withColumnRenamed(old_col, column_mapping[old_col.lower()])
print(df.printSchema())
# Reorder columns to match existing_df
df = df.select(target_columns)
print(df.count())
forecast_df = spark.read.table("default.forecasts")
company_stocks_df = spark.read.table("default.company_stocks")
jn_df = forecast_df.join(company_stocks_df, [forecast_df.ticker == company_stocks_df.stocksymbol, forecast_df.calendaryear==2024], how="inner")
selected_df = jn_df.select("ticker", "calendaryear", "exchange", "sector", "isin", "debt_growth_pct",
"dividend_per_share_growth", "ebitda_growth_pct", "eps_diluted_growth_pct", "eps_growth_pct", "cash_per_share", "revenue_growth_pct", "net_income_growth_pct")
selected_df.write.saveAsTable("default.company_forecasts_2024", format="iceberg", mode="overwrite")
print(jn_df.count())
print(jn_df.show())
spark-defaults.conf
spark.jars.packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.hive.metastore.uri thrift://localhost:9083
spark.sql.warehouse.dir hdfs://ilocalhost:9000/user/hive/warehouse
spark.sql.catalogImplementation hive
spark.sql.catalog.spark_catalog org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.spark_catalog.type hive
spark.sql.catalog.spark_catalog.warehouse hdfs://localhost:9000/user/spark/warehouse
spark.sql.catalog.local org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.local.type hive
spark.sql.catalog.local.warehouse /Volumes/ExtShield/opt/local_catalog
spark.sql.hive.metastore.version 2.3.10
spark.sql.hive.metastore.jars builtin
spark.sql.hive.metastore.jars.path $HIVE_HOME/lib/*
spark.sql.hive.metastore.sharedPrefixes org.postgresql
spark.sql.hive.hiveserver2.jdbc.url jdbc:hive2://localhost:10001/default
Acceso a datos para Datalake
Para consultar los datos mediante una herramienta externa, comencé con una conexión JDBC básica a Hive usando DataGrip, lo que generó la captura de pantalla del esquema anterior. Tras la carga de datos, creé vistas que recuperaban subconjuntos de datos según los valores ISIN. La consulta básica es la misma que se muestra a continuación; la única diferencia radica en que los caracteres iniciales del valor ISIN coinciden con el código de país ISO2 mediante un comodín.
select cs.asofdate, cs.stocksymbol, cs.isin, cs.companyname, cs.country, fc.estimatedate, fc.debt_growth_pct, fc.dividend_per_share_growth,
fc.eps_diluted_growth_pct, fc.five_year_net_income_growth_per_share, fc.five_year_dividend_per_share_growth, fc.five_year_operating_cashflow_growth_per_share,
fc.five_year_revenue_growth_per_share, fc.five_year_shareholders_equity_growth_per_share
from company_stocks cs left join forecasts fc on (cs.stocksymbol = fc.ticker and fc.calendaryear = '2024') where cs.isin like 'US%';
Posteriormente, creé diferentes vistas filtrando los valores ISIN y usé una para comparar la ejecución de una sentencia SQL de unión con una simple selección desde la vista. A continuación se muestran los resultados; en resumen, son prácticamente iguales. Ambas utilizan MapReduce a través de Hadoop y dedican aproximadamente el mismo tiempo.
SELECT de Hive con JDBC
SELECCIONAR usando Unir
select cs.asofdate, cs.stocksymbol, cs.isin, cs.companyname, cs.country, fc.estimatedate, fc.debt_growth_pct, fc.dividend_per_share_growth, fc.eps_diluted_growth_pct, fc.five_year_net_income_growth_per_share, fc.five_year_dividend_per_share_growth, fc.five_year_operating_cashflow_growth_per_share, fc.five_year_revenue_growth_per_share, fc.five_year_shareholders_equity_growth_per_share from company_stocks cs left join forecasts fc on (cs.stocksymbol = fc.ticker and fc.calendaryear = '2024') where cs.isin like 'US%' [2025-06-20 18:07:59] 500 rows retrieved starting from 1 in 17 s 530 ms (execution: 17 s 262 ms, fetching: 268 ms) -------------------------------------------------------------------------------------------------- Query ID = claude_paugh_20250620180742_2bf50365-811c-4e27-b61e-7d7181339c9c Total jobs = 2 Stage-5 is filtered out by condition resolver. Stage-1 is selected by condition resolver. Launching Job 1 out of 2 Number of reduce tasks not specified. Estimated from input data size: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> Starting Job = job_1750435886619_0016, Tracking URL = http://localhost:8088/proxy/application_1750435886619_0016/ Kill Command = /opt/homebrew/Cellar/hadoop/3.4.1/libexec/bin/mapred job -kill job_1750435886619_0016 Hadoop job information for Stage-1:
number of mappers: 2; number of reducers: 1
2025-06-20 18:07:47,945 Stage-1 map = 0%, reduce = 0%
2025-06-20 18:07:53,120 Stage-1 map = 50%, reduce = 0%
2025-06-20 18:07:57,237 Stage-1 map = 100%, reduce = 0%
2025-06-20 18:07:58,268 Stage-1 map = 100%, reduce = 100%
Ended Job = job_1750435886619_0016
MapReduce Jobs Launched: Stage-Stage-1: Map: 2 Reduce: 1
HDFS Read: 22020529 HDFS Write: 1557723 HDFS EC Read: 0 SUCCESS Total MapReduce CPU Time Spent: 0 msec
SELECCIONAR usando Ver
default> select * from default.vw_forecasts_us_2024
2025-06-20 18:12:49] 500 rows retrieved starting from 1 in 18 s 546 ms (execution: 18 s 281 ms, fetching: 265 ms)
----------------------------------------------------------------------------------------------------
Query ID = claude_paugh_20250620181230_1794bd84-e171-48ed-9915-a95ea5717a21
Total jobs = 2
Stage-5 is filtered out by condition resolver.
Stage-1 is selected by condition resolver.
Launching Job 1 out of 2
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1750435886619_0018, Tracking URL = http://localhost:8088/proxy/application_1750435886619_0018/
Kill Command = /opt/homebrew/Cellar/hadoop/3.4.1/libexec/bin/mapred job -kill job_1750435886619_0018
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1
2025-06-20 18:12:36,547 Stage-1 map = 0%, reduce = 0%
2025-06-20 18:12:42,767 Stage-1 map = 50%, reduce = 0%
2025-06-20 18:12:46,875 Stage-1 map = 100%, reduce = 0%
2025-06-20 18:12:47,913 Stage-1 map = 100%, reduce = 100%
Ended Job = job_1750435886619_0018
MapReduce Jobs Launched:
Stage-Stage-1: Map: 2 Reduce: 1 HDFS Read: 22021017 HDFS Write: 1557723 HDFS EC Read: 0 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
[
Probé varios otros escenarios de unión simples con una vista y los resultados fueron similares. Mi conclusión inicial es que, a menos que se necesiten las tablas base para otros datos, algunos de estos escenarios de filtro o agregación funcionan mejor con uniones de dataframes de Apache Spark.
No es de extrañar, pero si los usuarios necesitan esas tablas base para otros casos de uso, que Hadoop las procese es una alternativa viable; solo hay que esperar la diferencia en la ejecución. No he optimizado completamente la infraestructura de Hadoop, así que probablemente también podría haber mejorado los tiempos de ejecución de MapReduce.
Tuve algunas limitaciones de tiempo para este artículo, así que no pude probar Apache Drill ni Apache Superset para una mejor experiencia de usuario. Sin embargo, como las conexiones de Hive están disponibles en ambos, deberías poder acceder a los datos. Voy a intentar comparar Drill y Superset para poder dedicar más tiempo a la comparación.
El ejemplo que proporcioné se centró exclusivamente en Hive, pero también se pueden usar herramientas adicionales como Drill para acceder al sistema de archivos HDFS y al contenido de los archivos, de modo que JDBC y Hive se centran en los datos estructurados que se tienen. Existen muchos motores de consulta similares que también cuentan con conectores, lo que permite consultar el sistema de archivos HDFS, analizar JSON, XML y archivos delimitados, además de usar conexiones JDBC. Presto es un motor común, al igual que Glue de AWS, que permite abrir diferentes formatos de datos.
Un último punto: Los modelos de datalake y lakehouse han adoptado, de forma flexible, un modelo de control de calidad de datos para la descarga, depuración, transformación y liberación de datos, que se ha utilizado durante muchos años en almacenes de datos. Este flujo de trabajo está diseñado para la calidad de los datos en diferentes niveles, algo que suele requerir un lakehouse. Los datalakes suelen utilizar la descarga y la depuración ligera para presentar los datos a los usuarios.