Apache Iceberg, Hadoop & Hive: Öffnen Sie Ihren Datalake (Lakehouse) -> Teil II
- Claude Paugh

- 24. Juni
- 7 Min. Lesezeit
In diesem Artikel demonstriere ich den Benutzerzugriff auf die Hive-Metadaten und die Mechanismen zur Erstellung von Ergebnismengen. Ich hoffe, Ihnen zeigen zu können, wie Sie Datalake- oder Lakehouse-Daten für Benutzer öffnen können.

Ich muss etwas zurückgehen, bevor ich mit einer Einschränkung beginne. In meinem vorherigen Beitrag habe ich ein Einfügebeispiel sowie eine Abfrage meiner Hive-Datenbank bereitgestellt. Ich hätte erwähnen sollen, dass ich aufgrund fehlender Ressourcen einige Leistungsoptimierungen an meiner lokalen Hadoop- und Hive-Installation vornehmen musste.
Das Schreiben und Lesen war lokal besonders langsam, daher habe ich einige Änderungen vorgenommen. Nachfolgend sind die meisten Änderungen aufgeführt, die ich zur Leistungsverbesserung vorgenommen habe. Wichtige Parameter habe ich in den Zeilen kommentiert.
core-site.xml
<property>
<name>io.file.buffer.size</name>
<value>524288</value>
</property>
<property>
<name>fs.inmemory.size.mb</name>
<value>350</value>
</property>
<property>
<property>
<name>iceberg.engine.hive.enabled</name>
<value>true</value>
</property>
hdfs-site.xml
<property>
<name>dfs.block.size</name>
<value>134217728</value> # Increased from default
</property>
<property>
<name>dfs.namenode.handler.count</name>
<value>40</value> # Increased from default
</property>
<property>
<name>dfs.namenode.avoid.read.slow.datanode</name>
<value>true</value> # Added
</property>mapred-site.xml
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.map.output.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value> # Added enables parallel compression
</property>
<property>
<name>mapred.map.child.java.opts</name>
<value>-Xmx512M</value> # Reduced from 1GB default
</property>
<property>
<name>mapred.reduce.child.java.opts</name>
<value>-Xmx512M</value>. # Reduced from 1GB default
</property>
<property>
<name>mapreduce.map.cpu.vcores</name>
<value>2</value> # Larger, Default is 1
</property>
<property>
<name>yarn.app.mapreduce.am.command-opts</name>
<value>-Xmx3768m</value>
</property>yarn-site.xml
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>2</value> # New - matches the mapred allocation
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>localhost</value> # New, I have two ethernet interfaces, so avoiding wireless IP
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>512</value> # New, allocate fixed size for scheduler
</property>
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>true</value> # New, check physical memory available
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>true</value> # New, check virtual memory available
</property>
<property>
<name>yarn.nodemanager.disk-health-checker.min-healthy-disks</name>
<value>0.0</value> # Disk safety check
</property>
<property>
<name>yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage</name>
<value>100.0</value> # Disk safety check
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>16392</value> # Increased from default
<description>Physical memory, in MB, to be made available to running containers</description>
</property>Ausgehend vom letzten Beitrag habe ich mit einer recht gängigen Schnittstelle begonnen, die von Datenspeichern für den Zugriff auf ihre Daten verwendet wird. Ich habe eine einfache JDBC-Verbindung zu meiner Hive-Datenbank und dem zugrunde liegenden Data Lake eingerichtet: jdbc:hive2:// localhost:10001/default .
Anschließend habe ich die Daten abgefragt, die ich zuvor über Spark geladen hatte. Ich habe wieder JetBrains DataGrip verwendet. Mein Hive-Schema sah folgendermaßen aus:

Die Tabellen beziehen sich auf Aktien und ich habe die Tabelle „company_forecasts_2024“ mit einem Apache Spark-Job erstellt, der meine Datensätze „company_stocks“ und „forecasts“ mithilfe von Dataframes verbunden hat.
Der Unterschied besteht darin, dass ich zehn Datendateien habe, die „Prognosen“ bilden. Das verwendete PySpark-Skript ist unten aufgeführt. Ich habe einen eher werkseitigen Ansatz gewählt, um das Schema mit den Hive-Tabellen abzugleichen, anstatt wie zuvor fest zu codieren.
Der grundlegende Ablauf (siehe Diagramm unten) besteht darin, die zehn Dateien für Prognosen zu lesen und einen Datenrahmen zu erstellen. Anschließend wird die aktuelle Tabellendefinition „Forecasts“ in der Hive-Datenbank gelesen, da ich meine Iceberg-Eigenschaften für die Speicherung behalten wollte.
Als Nächstes wurde der durch das Lesen der Spark-Datei erstellte Datenrahmen mit der vorhandenen Hive-Tabelle abgeglichen. Ich musste die Typen und die Spaltenreihenfolge an die Hive-Tabelle anpassen, um Daten über eine Datenrahmenmethode zu speichern. Anschließend habe ich die gelesenen Daten an die Tabelle in Hive angehängt.

Im nächsten Schritt lese ich sowohl die aktualisierten „Forecasts“ als auch die „Company_Stocks“ aus Hive und führe eine „Inner Join“-Filterung für nur 2024 Daten durch. Anschließend füge ich bestimmte Elemente aus jeder Tabelle in einen neuen Datenrahmen ein. Der letzte Schritt besteht darin, diesen Datenrahmen in eine neue Tabelle in Hive zu schreiben. Die Gesamtzeit betrug ca. 5 Sekunden. Die Code- und Konfigurationsänderungen spark-defaults.co nf ; in der Conf-Datei gibt es einige Zeilenumbrüche.
PySpark-Skript
from datetime import datetime
from pyspark.sql.types import DoubleType, DateType, BooleanType, IntegerType, StringType, TimestampType
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import hive_metastore_client as hme
spark = (SparkSession
.builder
.appName("Python Spark SQL Hive integration example")
.master("local[*]")
.config("spark.jars", "/Volumes/ExtShield/opt/spark-3.5.6-bin-hadoop3/jars/iceberg-spark-runtime-3.5_2.12-1.9.1.jar")
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1")
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.local.type", "hive")
.config("spark.sql.catalog.local.warehouse", "hdfs://localhost:9000/user/hive/warehouse")
.config("spark.sql.defaultCatalog", "local")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalogImplementation", "hive")
.config("spark.hadoop.hive.metastore.uris", "thrift://localhost:9083")
.config("spark.sql.warehouse.dir", "hdfs://localhost:9000/user/hive/warehouse")
.config("spark.sql.warehouse.external.enabled", "true")
.enableHiveSupport()
.getOrCreate())
df = spark.read.csv('hdfs://localhost:9000/input_files/forecast-*.txt',
header=True, inferSchema=True, sep='|')
# spark is an existing SparkSession
hive_jdbc = {
"url": "jdbc:hive2://localhost:10001/default",
"driver": "org.apache.hive.jdbc.HiveDriver",
"table": "forecasts"
}
existing_df = spark.read.jdbc(url=hive_jdbc.get("url"), table=hive_jdbc.get("table"))
target_schema = existing_df.schema.names
target_columns = existing_df.schema.fieldNames()
target_columns = [c.replace("forecasts.", "") for c in target_columns]
target_table = hive_jdbc.get("table")
# Create a mapping between upper and lower case column names
column_mapping = {col_name: col_name for col_name in target_columns}
# First rename the columns to match case
for old_col in df.columns:
if old_col.lower() in column_mapping:
print(f"{old_col}, {old_col.lower()}")
df = df.withColumnRenamed(old_col, column_mapping[old_col.lower()])
print(df.printSchema())
# Reorder columns to match existing_df
df = df.select(target_columns)
print(df.count())
forecast_df = spark.read.table("default.forecasts")
company_stocks_df = spark.read.table("default.company_stocks")
jn_df = forecast_df.join(company_stocks_df, [forecast_df.ticker == company_stocks_df.stocksymbol, forecast_df.calendaryear==2024], how="inner")
selected_df = jn_df.select("ticker", "calendaryear", "exchange", "sector", "isin", "debt_growth_pct",
"dividend_per_share_growth", "ebitda_growth_pct", "eps_diluted_growth_pct", "eps_growth_pct", "cash_per_share", "revenue_growth_pct", "net_income_growth_pct")
selected_df.write.saveAsTable("default.company_forecasts_2024", format="iceberg", mode="overwrite")
print(jn_df.count())
print(jn_df.show())spark-defaults.conf
spark.jars.packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.hive.metastore.uri thrift://localhost:9083
spark.sql.warehouse.dir hdfs://ilocalhost:9000/user/hive/warehouse
spark.sql.catalogImplementation hive
spark.sql.catalog.spark_catalog org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.spark_catalog.type hive
spark.sql.catalog.spark_catalog.warehouse hdfs://localhost:9000/user/spark/warehouse
spark.sql.catalog.local org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.local.type hive
spark.sql.catalog.local.warehouse /Volumes/ExtShield/opt/local_catalog
spark.sql.hive.metastore.version 2.3.10
spark.sql.hive.metastore.jars builtin
spark.sql.hive.metastore.jars.path $HIVE_HOME/lib/*
spark.sql.hive.metastore.sharedPrefixes org.postgresql
spark.sql.hive.hiveserver2.jdbc.url jdbc:hive2://localhost:10001/default
Datenzugriff für Datalake
Um die Daten über ein Drittanbietertool abzufragen, habe ich mithilfe von DataGrip eine einfache JDBC-Verbindung zu Hive hergestellt, die den Screenshot des obigen Schemas erzeugte. Nach dem Laden der Daten habe ich Ansichten erstellt, die Datenteilmengen basierend auf den ISIN-Werten abrufen. Die grundlegende Abfrage ist dieselbe wie unten dargestellt. Der Unterschied besteht lediglich darin, dass die führenden Zeichen des ISIN-Werts mit dem ISO2-Ländercode mit Platzhalterübereinstimmung übereinstimmen.
select cs.asofdate, cs.stocksymbol, cs.isin, cs.companyname, cs.country, fc.estimatedate, fc.debt_growth_pct, fc.dividend_per_share_growth,
fc.eps_diluted_growth_pct, fc.five_year_net_income_growth_per_share, fc.five_year_dividend_per_share_growth, fc.five_year_operating_cashflow_growth_per_share,
fc.five_year_revenue_growth_per_share, fc.five_year_shareholders_equity_growth_per_share
from company_stocks cs left join forecasts fc on (cs.stocksymbol = fc.ticker and fc.calendaryear = '2024') where cs.isin like 'US%';Anschließend habe ich verschiedene Ansichten erstellt, die nach den ISIN-Werten filtern. Eine davon habe ich verwendet, um die Ausführung einer Join-SQL-Anweisung mit einer einfachen Auswahl aus der Ansicht zu vergleichen. Die Ergebnisse sind unten aufgeführt. Kurz gesagt, sie sind nahezu identisch. Beide verwenden MapReduce über Hadoop und benötigen ungefähr den gleichen Zeitaufwand.
SELECT aus Hive mit JDBC
SELECT mit Join
select cs.asofdate, cs.stocksymbol, cs.isin, cs.companyname, cs.country, fc.estimatedate, fc.debt_growth_pct, fc.dividend_per_share_growth, fc.eps_diluted_growth_pct, fc.five_year_net_income_growth_per_share, fc.five_year_dividend_per_share_growth, fc.five_year_operating_cashflow_growth_per_share, fc.five_year_revenue_growth_per_share, fc.five_year_shareholders_equity_growth_per_share from company_stocks cs left join forecasts fc on (cs.stocksymbol = fc.ticker and fc.calendaryear = '2024') where cs.isin like 'US%' [2025-06-20 18:07:59] 500 rows retrieved starting from 1 in 17 s 530 ms (execution: 17 s 262 ms, fetching: 268 ms) -------------------------------------------------------------------------------------------------- Query ID = claude_paugh_20250620180742_2bf50365-811c-4e27-b61e-7d7181339c9c Total jobs = 2 Stage-5 is filtered out by condition resolver. Stage-1 is selected by condition resolver. Launching Job 1 out of 2 Number of reduce tasks not specified. Estimated from input data size: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> Starting Job = job_1750435886619_0016, Tracking URL = http://localhost:8088/proxy/application_1750435886619_0016/ Kill Command = /opt/homebrew/Cellar/hadoop/3.4.1/libexec/bin/mapred job -kill job_1750435886619_0016 Hadoop job information for Stage-1:
number of mappers: 2; number of reducers: 1
2025-06-20 18:07:47,945 Stage-1 map = 0%, reduce = 0%
2025-06-20 18:07:53,120 Stage-1 map = 50%, reduce = 0%
2025-06-20 18:07:57,237 Stage-1 map = 100%, reduce = 0%
2025-06-20 18:07:58,268 Stage-1 map = 100%, reduce = 100%
Ended Job = job_1750435886619_0016
MapReduce Jobs Launched: Stage-Stage-1: Map: 2 Reduce: 1
HDFS Read: 22020529 HDFS Write: 1557723 HDFS EC Read: 0 SUCCESS Total MapReduce CPU Time Spent: 0 msec SELECT mit View
default> select * from default.vw_forecasts_us_2024
2025-06-20 18:12:49] 500 rows retrieved starting from 1 in 18 s 546 ms (execution: 18 s 281 ms, fetching: 265 ms)
----------------------------------------------------------------------------------------------------
Query ID = claude_paugh_20250620181230_1794bd84-e171-48ed-9915-a95ea5717a21
Total jobs = 2
Stage-5 is filtered out by condition resolver.
Stage-1 is selected by condition resolver.
Launching Job 1 out of 2
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1750435886619_0018, Tracking URL = http://localhost:8088/proxy/application_1750435886619_0018/
Kill Command = /opt/homebrew/Cellar/hadoop/3.4.1/libexec/bin/mapred job -kill job_1750435886619_0018
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1
2025-06-20 18:12:36,547 Stage-1 map = 0%, reduce = 0%
2025-06-20 18:12:42,767 Stage-1 map = 50%, reduce = 0%
2025-06-20 18:12:46,875 Stage-1 map = 100%, reduce = 0%
2025-06-20 18:12:47,913 Stage-1 map = 100%, reduce = 100%
Ended Job = job_1750435886619_0018
MapReduce Jobs Launched:
Stage-Stage-1: Map: 2 Reduce: 1 HDFS Read: 22021017 HDFS Write: 1557723 HDFS EC Read: 0 SUCCESS
Total MapReduce CPU Time Spent: 0 msec[
Ich habe mehrere andere einfache Join-Szenarien mit einer Ansicht getestet, und die Ergebnisse waren ähnlich. Mein erster Eindruck ist, dass einige dieser Filter- und/oder Aggregatszenarien mit Apache Spark-Dataframe-Joins besser funktionieren, sofern Sie die Basistabellen nicht für andere Daten benötigen.
Das ist keine Überraschung, aber wenn Benutzer diese Basistabellen für andere Anwendungsfälle benötigen, ist die Verarbeitung durch Hadoop eine sinnvolle Alternative. Rechnen Sie jedoch mit den Unterschieden bei der Ausführung. Da ich die Hadoop-Infrastruktur nicht vollständig optimiert habe, hätte ich wahrscheinlich auch die Ausführungszeiten für MapReduce verbessern können.
Da ich für diesen Artikel zeitlich etwas eingeschränkt war, konnte ich weder Apache Drill noch Apache Superset testen, um die Benutzerfreundlichkeit zu verbessern. Da jedoch Hive-Verbindungen für beide verfügbar sind, sollten Sie auf die Daten zugreifen können. Ich werde einen Vergleich zwischen Drill und Superset versuchen, um mehr Zeit für den Vergleich zu haben.
Das von mir angeführte Beispiel konzentrierte sich ausschließlich auf Hive. Sie können aber auch zusätzliche Tools wie Drill verwenden, um auf das HDFS-Dateisystem und Dateiinhalte zuzugreifen. JDBC und Hive nutzen also Ihre strukturierten Daten. Viele ähnliche Abfrage-Engines verfügen ebenfalls über Konnektoren, sodass Sie das HDFS-Dateisystem abfragen, JSON, XML und durch Trennzeichen getrennte Dateien analysieren und JDBC-Verbindungen nutzen können. Presto ist ein gängiges Tool, ebenso wie Glue von AWS, das verschiedene Datenformate öffnen kann.
Ein letzter Punkt: Die Datalake- und Lakehouse-Modelle haben ein Datenqualitätskontrollmodell für die Datenlandung, -bereinigung, -transformation und -freigabe übernommen, das seit vielen Jahren in Data Warehouses verwendet wird. Dieser Workflow zielt auf die Datenqualität auf vielen verschiedenen Ebenen ab, die üblicherweise von einem Lakehouse benötigt wird. Datalakes nutzen am häufigsten die Datenlandung und die „Lite“-Bereinigung, um Daten den Benutzern bereitzustellen.

