Apache Iceberg, Hadoop et Hive : ouvrez votre Datalake (Lakehouse) -> Partie II
- Claude Paugh
- 24 juin
- 7 min de lecture
Dans cet article, je vais démontrer l'accès des utilisateurs aux métadonnées Hive et les mécanismes utilisés pour créer des jeux de résultats. J'espère pouvoir vous montrer comment ouvrir les données d'un lac de données ou d'un lachouse aux utilisateurs.

Je dois revenir en arrière avant de commencer, avec une mise en garde. J'ai fourni un exemple d'insertion, ainsi qu'une requête sur ma base de données Hive, dans mon article précédent. J'aurais dû préciser que j'avais besoin d'optimiser les performances de mon installation Hadoop et Hive locale, faute de ressources.
L'écriture et la lecture étaient particulièrement lentes localement, j'ai donc apporté quelques modifications. Vous trouverez ci-dessous la plupart des modifications apportées pour améliorer les performances. J'ai commenté les paramètres importants.
core-site.xml
<property>
<name>io.file.buffer.size</name>
<value>524288</value>
</property>
<property>
<name>fs.inmemory.size.mb</name>
<value>350</value>
</property>
<property>
<property>
<name>iceberg.engine.hive.enabled</name>
<value>true</value>
</property>
hdfs-site.xml
<property>
<name>dfs.block.size</name>
<value>134217728</value> # Increased from default
</property>
<property>
<name>dfs.namenode.handler.count</name>
<value>40</value> # Increased from default
</property>
<property>
<name>dfs.namenode.avoid.read.slow.datanode</name>
<value>true</value> # Added
</property>mapred-site.xml
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.map.output.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value> # Added enables parallel compression
</property>
<property>
<name>mapred.map.child.java.opts</name>
<value>-Xmx512M</value> # Reduced from 1GB default
</property>
<property>
<name>mapred.reduce.child.java.opts</name>
<value>-Xmx512M</value>. # Reduced from 1GB default
</property>
<property>
<name>mapreduce.map.cpu.vcores</name>
<value>2</value> # Larger, Default is 1
</property>
<property>
<name>yarn.app.mapreduce.am.command-opts</name>
<value>-Xmx3768m</value>
</property>site-fil.xml
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>2</value> # New - matches the mapred allocation
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>localhost</value> # New, I have two ethernet interfaces, so avoiding wireless IP
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>512</value> # New, allocate fixed size for scheduler
</property>
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>true</value> # New, check physical memory available
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>true</value> # New, check virtual memory available
</property>
<property>
<name>yarn.nodemanager.disk-health-checker.min-healthy-disks</name>
<value>0.0</value> # Disk safety check
</property>
<property>
<name>yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage</name>
<value>100.0</value> # Disk safety check
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>16392</value> # Increased from default
<description>Physical memory, in MB, to be made available to running containers</description>
</property>Suite au dernier article , j'ai commencé avec une interface assez courante utilisée par les entrepôts de données pour accéder à leurs données. J'ai configuré une connexion JDBC simple à ma base de données Hive et au datalake sous-jacent : jdbc:hive2:// localhost:10001/default .
J'ai ensuite interrogé les données précédemment chargées via Spark. J'utilisais à nouveau JetBrains DataGrip. Mon schéma Hive ressemblait à ceci :

Les tables sont liées aux stocks et j'ai créé la table « company_forecasts_2024 » avec un travail Apache Spark qui a joint mes ensembles de données « company_stocks » et « forecasts » à l'aide de dataframes.
C'est légèrement différent, car j'ai 10 fichiers de données qui constituent des « prévisions ». Le script PySpark que j'ai utilisé est ci-dessous. J'ai adopté une approche plus « usine » pour synchroniser le schéma avec les tables Hive, au lieu de le coder en dur comme je le faisais auparavant.
Le flux de base (schéma ci-dessous) consiste à lire les 10 fichiers de prévisions, à créer un dataframe, puis à lire la définition actuelle de la table « prévisions » dans la base de données Hive, car je souhaitais conserver mes propriétés Iceberg pour le stockage.
Ensuite, j'ai comparé le dataframe créé par la lecture du fichier Spark à la table Hive existante. J'ai dû modifier les types et l'ordre des colonnes pour qu'ils correspondent à la table Hive afin d'enregistrer les données via une méthode dataframe. J'ai ensuite ajouté les données lues dans le fichier à la table dans Hive.

À l'étape suivante, j'ai lu les données « prévisions » et « actions_entreprises » mises à jour depuis Hive, puis j'ai effectué une jointure interne pour filtrer uniquement les données de 2024. J'ai ensuite fusionné des éléments spécifiques de chaque table dans un nouveau dataframe. La dernière étape consiste à écrire ce dataframe dans une nouvelle table dans Hive (temps total : environ 5 secondes). Le code et la configuration ont été modifiés. spark-defaults.co nf ; il y a un retour à la ligne dans le fichier de configuration.
Script PySpark
from datetime import datetime
from pyspark.sql.types import DoubleType, DateType, BooleanType, IntegerType, StringType, TimestampType
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import hive_metastore_client as hme
spark = (SparkSession
.builder
.appName("Python Spark SQL Hive integration example")
.master("local[*]")
.config("spark.jars", "/Volumes/ExtShield/opt/spark-3.5.6-bin-hadoop3/jars/iceberg-spark-runtime-3.5_2.12-1.9.1.jar")
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1")
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.local.type", "hive")
.config("spark.sql.catalog.local.warehouse", "hdfs://localhost:9000/user/hive/warehouse")
.config("spark.sql.defaultCatalog", "local")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalogImplementation", "hive")
.config("spark.hadoop.hive.metastore.uris", "thrift://localhost:9083")
.config("spark.sql.warehouse.dir", "hdfs://localhost:9000/user/hive/warehouse")
.config("spark.sql.warehouse.external.enabled", "true")
.enableHiveSupport()
.getOrCreate())
df = spark.read.csv('hdfs://localhost:9000/input_files/forecast-*.txt',
header=True, inferSchema=True, sep='|')
# spark is an existing SparkSession
hive_jdbc = {
"url": "jdbc:hive2://localhost:10001/default",
"driver": "org.apache.hive.jdbc.HiveDriver",
"table": "forecasts"
}
existing_df = spark.read.jdbc(url=hive_jdbc.get("url"), table=hive_jdbc.get("table"))
target_schema = existing_df.schema.names
target_columns = existing_df.schema.fieldNames()
target_columns = [c.replace("forecasts.", "") for c in target_columns]
target_table = hive_jdbc.get("table")
# Create a mapping between upper and lower case column names
column_mapping = {col_name: col_name for col_name in target_columns}
# First rename the columns to match case
for old_col in df.columns:
if old_col.lower() in column_mapping:
print(f"{old_col}, {old_col.lower()}")
df = df.withColumnRenamed(old_col, column_mapping[old_col.lower()])
print(df.printSchema())
# Reorder columns to match existing_df
df = df.select(target_columns)
print(df.count())
forecast_df = spark.read.table("default.forecasts")
company_stocks_df = spark.read.table("default.company_stocks")
jn_df = forecast_df.join(company_stocks_df, [forecast_df.ticker == company_stocks_df.stocksymbol, forecast_df.calendaryear==2024], how="inner")
selected_df = jn_df.select("ticker", "calendaryear", "exchange", "sector", "isin", "debt_growth_pct",
"dividend_per_share_growth", "ebitda_growth_pct", "eps_diluted_growth_pct", "eps_growth_pct", "cash_per_share", "revenue_growth_pct", "net_income_growth_pct")
selected_df.write.saveAsTable("default.company_forecasts_2024", format="iceberg", mode="overwrite")
print(jn_df.count())
print(jn_df.show())spark-defaults.conf
spark.jars.packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.hive.metastore.uri thrift://localhost:9083
spark.sql.warehouse.dir hdfs://ilocalhost:9000/user/hive/warehouse
spark.sql.catalogImplementation hive
spark.sql.catalog.spark_catalog org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.spark_catalog.type hive
spark.sql.catalog.spark_catalog.warehouse hdfs://localhost:9000/user/spark/warehouse
spark.sql.catalog.local org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.local.type hive
spark.sql.catalog.local.warehouse /Volumes/ExtShield/opt/local_catalog
spark.sql.hive.metastore.version 2.3.10
spark.sql.hive.metastore.jars builtin
spark.sql.hive.metastore.jars.path $HIVE_HOME/lib/*
spark.sql.hive.metastore.sharedPrefixes org.postgresql
spark.sql.hive.hiveserver2.jdbc.url jdbc:hive2://localhost:10001/default
Accès aux données pour Datalake
Pour interroger les données via un outil tiers, j'ai commencé par une connexion JDBC basique à Hive via DataGrip, ce qui a généré la capture d'écran du schéma ci-dessus. Après le chargement des données, j'ai créé des vues récupérant des sous-ensembles de données en fonction des valeurs ISIN. La requête de base est identique, comme illustré ci-dessous, à la différence près que les premiers caractères de la valeur ISIN correspondent au code pays ISO2 avec un caractère générique.
select cs.asofdate, cs.stocksymbol, cs.isin, cs.companyname, cs.country, fc.estimatedate, fc.debt_growth_pct, fc.dividend_per_share_growth,
fc.eps_diluted_growth_pct, fc.five_year_net_income_growth_per_share, fc.five_year_dividend_per_share_growth, fc.five_year_operating_cashflow_growth_per_share,
fc.five_year_revenue_growth_per_share, fc.five_year_shareholders_equity_growth_per_share
from company_stocks cs left join forecasts fc on (cs.stocksymbol = fc.ticker and fc.calendaryear = '2024') where cs.isin like 'US%';J'ai ensuite créé différentes vues filtrant les valeurs ISIN, et j'en ai utilisé une pour comparer l'exécution d'une instruction SQL de jointure et d'une simple sélection dans la vue. Voici les résultats, qui sont globalement similaires. Les deux utilisent MapReduce via Hadoop et nécessitent à peu près le même temps.
SÉLECTIONNER depuis Hive avec JDBC
SÉLECTIONNER en utilisant Join
select cs.asofdate, cs.stocksymbol, cs.isin, cs.companyname, cs.country, fc.estimatedate, fc.debt_growth_pct, fc.dividend_per_share_growth, fc.eps_diluted_growth_pct, fc.five_year_net_income_growth_per_share, fc.five_year_dividend_per_share_growth, fc.five_year_operating_cashflow_growth_per_share, fc.five_year_revenue_growth_per_share, fc.five_year_shareholders_equity_growth_per_share from company_stocks cs left join forecasts fc on (cs.stocksymbol = fc.ticker and fc.calendaryear = '2024') where cs.isin like 'US%' [2025-06-20 18:07:59] 500 rows retrieved starting from 1 in 17 s 530 ms (execution: 17 s 262 ms, fetching: 268 ms) -------------------------------------------------------------------------------------------------- Query ID = claude_paugh_20250620180742_2bf50365-811c-4e27-b61e-7d7181339c9c Total jobs = 2 Stage-5 is filtered out by condition resolver. Stage-1 is selected by condition resolver. Launching Job 1 out of 2 Number of reduce tasks not specified. Estimated from input data size: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> Starting Job = job_1750435886619_0016, Tracking URL = http://localhost:8088/proxy/application_1750435886619_0016/ Kill Command = /opt/homebrew/Cellar/hadoop/3.4.1/libexec/bin/mapred job -kill job_1750435886619_0016 Hadoop job information for Stage-1:
number of mappers: 2; number of reducers: 1
2025-06-20 18:07:47,945 Stage-1 map = 0%, reduce = 0%
2025-06-20 18:07:53,120 Stage-1 map = 50%, reduce = 0%
2025-06-20 18:07:57,237 Stage-1 map = 100%, reduce = 0%
2025-06-20 18:07:58,268 Stage-1 map = 100%, reduce = 100%
Ended Job = job_1750435886619_0016
MapReduce Jobs Launched: Stage-Stage-1: Map: 2 Reduce: 1
HDFS Read: 22020529 HDFS Write: 1557723 HDFS EC Read: 0 SUCCESS Total MapReduce CPU Time Spent: 0 msec SÉLECTIONNER en utilisant Afficher
default> select * from default.vw_forecasts_us_2024
2025-06-20 18:12:49] 500 rows retrieved starting from 1 in 18 s 546 ms (execution: 18 s 281 ms, fetching: 265 ms)
----------------------------------------------------------------------------------------------------
Query ID = claude_paugh_20250620181230_1794bd84-e171-48ed-9915-a95ea5717a21
Total jobs = 2
Stage-5 is filtered out by condition resolver.
Stage-1 is selected by condition resolver.
Launching Job 1 out of 2
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1750435886619_0018, Tracking URL = http://localhost:8088/proxy/application_1750435886619_0018/
Kill Command = /opt/homebrew/Cellar/hadoop/3.4.1/libexec/bin/mapred job -kill job_1750435886619_0018
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1
2025-06-20 18:12:36,547 Stage-1 map = 0%, reduce = 0%
2025-06-20 18:12:42,767 Stage-1 map = 50%, reduce = 0%
2025-06-20 18:12:46,875 Stage-1 map = 100%, reduce = 0%
2025-06-20 18:12:47,913 Stage-1 map = 100%, reduce = 100%
Ended Job = job_1750435886619_0018
MapReduce Jobs Launched:
Stage-Stage-1: Map: 2 Reduce: 1 HDFS Read: 22021017 HDFS Write: 1557723 HDFS EC Read: 0 SUCCESS
Total MapReduce CPU Time Spent: 0 msec[
J'ai testé plusieurs autres scénarios de jointure simples sur une vue, et les résultats étaient similaires. Mon premier constat est qu'à moins d'avoir besoin des tables de base pour d'autres données, certains de ces scénarios de filtrage et/ou d'agrégation sont plus performants avec les jointures de dataframes Apache Spark.
Ce n'est pas surprenant, mais si les utilisateurs ont besoin de ces tables de base pour d'autres cas d'utilisation, leur traitement par Hadoop constitue une alternative viable, compte tenu de la différence d'exécution. Je n'ai pas encore complètement optimisé l'infrastructure Hadoop ; j'aurais donc probablement pu améliorer les temps d'exécution de MapReduce également.
J'ai eu des contraintes de temps pour cet article, je n'ai donc pas eu le temps d'utiliser Apache Drill ou Apache Superset pour une meilleure expérience utilisateur. Cependant, comme les connexions Hive sont disponibles sur les deux, vous devriez pouvoir accéder aux données. Je vais tenter une comparaison entre Drill et Superset, ce qui me permettra de consacrer plus de temps à la comparaison.
L'exemple que j'ai fourni se concentrait exclusivement sur Hive, mais vous pouvez également utiliser des outils supplémentaires comme Drill pour accéder au système de fichiers HDFS et à son contenu. JDBC et Hive exploitent ainsi vos données structurées. De nombreux moteurs de requêtes proposent également des connecteurs, permettant d'interroger le système de fichiers HDFS, d'analyser des fichiers JSON, XML et délimités, et d'utiliser des connexions JDBC. Presto est un exemple courant, tout comme Glue d'AWS, qui permet d'ouvrir différents formats de données.
Dernier point : les modèles de lac de données et de lakehouse ont adopté un modèle de contrôle de la qualité des données pour la réception, le nettoyage, la transformation et la publication des données, utilisé depuis de nombreuses années dans les entrepôts de données. Ce flux de travail est conçu pour assurer la qualité des données à différents niveaux, généralement requis par un lakehouse. Les lacs de données utilisent généralement la réception et le nettoyage « léger » pour présenter les données aux utilisateurs.

