Apache Iceberg, Hadoop, & Hive: Open your Datalake (Lakehouse) -> Part II
- Claude Paugh
- Jun 24
- 7 min read
Updated: Jul 7
In this article I will demonstrate user access to the Hive metadata, and the mechanisms that are used for creating result sets. I hope to be able to demonstrate how you can open up datalake or lakehouse data for users.

I need to backtrack before I begin with a caveat. I provided an insert example, as well as a query on my Hive database in my previous post. I should have noted that I did need to do some performance tuning on my local Hadoop and Hive installation, due to my lack of resources.
The write and read was especially slow locally, so I made some changes. Below are most of the changes I made to help improve performance. I have commented in the lines for notable parameters.
core-site.xml
<property>
<name>io.file.buffer.size</name>
<value>524288</value>
</property>
<property>
<name>fs.inmemory.size.mb</name>
<value>350</value>
</property>
<property>
<property>
<name>iceberg.engine.hive.enabled</name>
<value>true</value>
</property>
hdfs-site.xml
<property>
<name>dfs.block.size</name>
<value>134217728</value> # Increased from default
</property>
<property>
<name>dfs.namenode.handler.count</name>
<value>40</value> # Increased from default
</property>
<property>
<name>dfs.namenode.avoid.read.slow.datanode</name>
<value>true</value> # Added
</property>
mapred-site.xml
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.map.output.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value> # Added enables parallel compression
</property>
<property>
<name>mapred.map.child.java.opts</name>
<value>-Xmx512M</value> # Reduced from 1GB default
</property>
<property>
<name>mapred.reduce.child.java.opts</name>
<value>-Xmx512M</value>. # Reduced from 1GB default
</property>
<property>
<name>mapreduce.map.cpu.vcores</name>
<value>2</value> # Larger, Default is 1
</property>
<property>
<name>yarn.app.mapreduce.am.command-opts</name>
<value>-Xmx3768m</value>
</property>
yarn-site.xml
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>2</value> # New - matches the mapred allocation
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>localhost</value> # New, I have two ethernet interfaces, so avoiding wireless IP
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>512</value> # New, allocate fixed size for scheduler
</property>
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>true</value> # New, check physical memory available
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>true</value> # New, check virtual memory available
</property>
<property>
<name>yarn.nodemanager.disk-health-checker.min-healthy-disks</name>
<value>0.0</value> # Disk safety check
</property>
<property>
<name>yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage</name>
<value>100.0</value> # Disk safety check
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>16392</value> # Increased from default
<description>Physical memory, in MB, to be made available to running containers</description>
</property>
Going forward from the last post, I started off with a fairly common interface used by data stores to access their data. I setup a simple JDBC connection to my Hive database, and underlying datalake, as: jdbc:hive2://localhost:10001/default.
I then went about querying the data that I had loaded via Spark previously. I was using JetBrains DataGrip again. My Hive schema looked like this:

The tables are stock related, and I created the "company_forecasts_2024" table with a Apache Spark job that joined my "company_stocks" and "forecasts" datasets using dataframes.
It's slightly different in that I have 10 datafiles that make up "forecasts". The PySpark script I used is below. I took a more "factory" approach to syncing the schema to match the Hive tables, instead of hardcoding as I did previously.
The basic flow (diagram below) is reading the 10 files for forecasts, creating a dataframe. Then reading the current "forecasts" table definition in the Hive database, since I wanted to keep my Iceberg properties for storage.
Next was checking the dataframe, that was created by the spark file read, against the existing Hive table. I needed to change the types and column order to match the Hive table to save data via a dataframe method. I then appended the file read data to the table in Hive.

In the next step, I read both the updated "forecasts" and "company_stocks" from Hive, perform an "inner join" filtering for only 2024 data. Next, merging specific elements from each table into a new dataframe. The final step is to write that dataframe to a new table in Hive --> total time was about 5 seconds. The code and configuration changes spark-defaults.conf; there is some line wrapping in the conf file.
PySpark Script
from datetime import datetime
from pyspark.sql.types import DoubleType, DateType, BooleanType, IntegerType, StringType, TimestampType
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import hive_metastore_client as hme
spark = (SparkSession
.builder
.appName("Python Spark SQL Hive integration example")
.master("local[*]")
.config("spark.jars", "/Volumes/ExtShield/opt/spark-3.5.6-bin-hadoop3/jars/iceberg-spark-runtime-3.5_2.12-1.9.1.jar")
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1")
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.local.type", "hive")
.config("spark.sql.catalog.local.warehouse", "hdfs://localhost:9000/user/hive/warehouse")
.config("spark.sql.defaultCatalog", "local")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalogImplementation", "hive")
.config("spark.hadoop.hive.metastore.uris", "thrift://localhost:9083")
.config("spark.sql.warehouse.dir", "hdfs://localhost:9000/user/hive/warehouse")
.config("spark.sql.warehouse.external.enabled", "true")
.enableHiveSupport()
.getOrCreate())
df = spark.read.csv('hdfs://localhost:9000/input_files/forecast-*.txt',
header=True, inferSchema=True, sep='|')
# spark is an existing SparkSession
hive_jdbc = {
"url": "jdbc:hive2://localhost:10001/default",
"driver": "org.apache.hive.jdbc.HiveDriver",
"table": "forecasts"
}
existing_df = spark.read.jdbc(url=hive_jdbc.get("url"), table=hive_jdbc.get("table"))
target_schema = existing_df.schema.names
target_columns = existing_df.schema.fieldNames()
target_columns = [c.replace("forecasts.", "") for c in target_columns]
target_table = hive_jdbc.get("table")
# Create a mapping between upper and lower case column names
column_mapping = {col_name: col_name for col_name in target_columns}
# First rename the columns to match case
for old_col in df.columns:
if old_col.lower() in column_mapping:
print(f"{old_col}, {old_col.lower()}")
df = df.withColumnRenamed(old_col, column_mapping[old_col.lower()])
print(df.printSchema())
# Reorder columns to match existing_df
df = df.select(target_columns)
print(df.count())
forecast_df = spark.read.table("default.forecasts")
company_stocks_df = spark.read.table("default.company_stocks")
jn_df = forecast_df.join(company_stocks_df, [forecast_df.ticker == company_stocks_df.stocksymbol, forecast_df.calendaryear==2024], how="inner")
selected_df = jn_df.select("ticker", "calendaryear", "exchange", "sector", "isin", "debt_growth_pct",
"dividend_per_share_growth", "ebitda_growth_pct", "eps_diluted_growth_pct", "eps_growth_pct", "cash_per_share", "revenue_growth_pct", "net_income_growth_pct")
selected_df.write.saveAsTable("default.company_forecasts_2024", format="iceberg", mode="overwrite")
print(jn_df.count())
print(jn_df.show())
spark-defaults.conf
spark.jars.packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.hive.metastore.uri thrift://localhost:9083
spark.sql.warehouse.dir hdfs://ilocalhost:9000/user/hive/warehouse
spark.sql.catalogImplementation hive
spark.sql.catalog.spark_catalog org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.spark_catalog.type hive
spark.sql.catalog.spark_catalog.warehouse hdfs://localhost:9000/user/spark/warehouse
spark.sql.catalog.local org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.local.type hive
spark.sql.catalog.local.warehouse /Volumes/ExtShield/opt/local_catalog
spark.sql.hive.metastore.version 2.3.10
spark.sql.hive.metastore.jars builtin
spark.sql.hive.metastore.jars.path $HIVE_HOME/lib/*
spark.sql.hive.metastore.sharedPrefixes org.postgresql
spark.sql.hive.hiveserver2.jdbc.url jdbc:hive2://localhost:10001/default
Data Access for Datalake
To query the data via a third-party tool, I started with a basic JDBC connection to Hive using DataGrip, which produced the screen shot of the schema above. After the dataload, I created views that retrieved subsets of data based on the ISIN values. The basic query is the same, which is below, the difference is just the leading characters of the ISIN value matches the ISO2 country code with a wildcard match.
select cs.asofdate, cs.stocksymbol, cs.isin, cs.companyname, cs.country, fc.estimatedate, fc.debt_growth_pct, fc.dividend_per_share_growth,
fc.eps_diluted_growth_pct, fc.five_year_net_income_growth_per_share, fc.five_year_dividend_per_share_growth, fc.five_year_operating_cashflow_growth_per_share,
fc.five_year_revenue_growth_per_share, fc.five_year_shareholders_equity_growth_per_share
from company_stocks cs left join forecasts fc on (cs.stocksymbol = fc.ticker and fc.calendaryear = '2024') where cs.isin like 'US%';
I subsequently created different views filtering on the ISIN values, and I used one to compare how a join SQL statement executed vs. a simple select from the view. Below are the results, and in a nutshell they are pretty much the same. Both using MapReduce via Hadoop and spending about the same amount of time.
SELECT from Hive with JDBC
SELECT using Join
select cs.asofdate, cs.stocksymbol, cs.isin, cs.companyname, cs.country, fc.estimatedate, fc.debt_growth_pct, fc.dividend_per_share_growth, fc.eps_diluted_growth_pct, fc.five_year_net_income_growth_per_share, fc.five_year_dividend_per_share_growth, fc.five_year_operating_cashflow_growth_per_share, fc.five_year_revenue_growth_per_share, fc.five_year_shareholders_equity_growth_per_share from company_stocks cs left join forecasts fc on (cs.stocksymbol = fc.ticker and fc.calendaryear = '2024') where cs.isin like 'US%' [2025-06-20 18:07:59] 500 rows retrieved starting from 1 in 17 s 530 ms (execution: 17 s 262 ms, fetching: 268 ms) -------------------------------------------------------------------------------------------------- Query ID = claude_paugh_20250620180742_2bf50365-811c-4e27-b61e-7d7181339c9c Total jobs = 2 Stage-5 is filtered out by condition resolver. Stage-1 is selected by condition resolver. Launching Job 1 out of 2 Number of reduce tasks not specified. Estimated from input data size: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> Starting Job = job_1750435886619_0016, Tracking URL = http://localhost:8088/proxy/application_1750435886619_0016/ Kill Command = /opt/homebrew/Cellar/hadoop/3.4.1/libexec/bin/mapred job -kill job_1750435886619_0016 Hadoop job information for Stage-1:
number of mappers: 2; number of reducers: 1
2025-06-20 18:07:47,945 Stage-1 map = 0%, reduce = 0%
2025-06-20 18:07:53,120 Stage-1 map = 50%, reduce = 0%
2025-06-20 18:07:57,237 Stage-1 map = 100%, reduce = 0%
2025-06-20 18:07:58,268 Stage-1 map = 100%, reduce = 100%
Ended Job = job_1750435886619_0016
MapReduce Jobs Launched: Stage-Stage-1: Map: 2 Reduce: 1
HDFS Read: 22020529 HDFS Write: 1557723 HDFS EC Read: 0 SUCCESS Total MapReduce CPU Time Spent: 0 msec
SELECT using View
default> select * from default.vw_forecasts_us_2024
2025-06-20 18:12:49] 500 rows retrieved starting from 1 in 18 s 546 ms (execution: 18 s 281 ms, fetching: 265 ms)
----------------------------------------------------------------------------------------------------
Query ID = claude_paugh_20250620181230_1794bd84-e171-48ed-9915-a95ea5717a21
Total jobs = 2
Stage-5 is filtered out by condition resolver.
Stage-1 is selected by condition resolver.
Launching Job 1 out of 2
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1750435886619_0018, Tracking URL = http://localhost:8088/proxy/application_1750435886619_0018/
Kill Command = /opt/homebrew/Cellar/hadoop/3.4.1/libexec/bin/mapred job -kill job_1750435886619_0018
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1
2025-06-20 18:12:36,547 Stage-1 map = 0%, reduce = 0%
2025-06-20 18:12:42,767 Stage-1 map = 50%, reduce = 0%
2025-06-20 18:12:46,875 Stage-1 map = 100%, reduce = 0%
2025-06-20 18:12:47,913 Stage-1 map = 100%, reduce = 100%
Ended Job = job_1750435886619_0018
MapReduce Jobs Launched:
Stage-Stage-1: Map: 2 Reduce: 1 HDFS Read: 22021017 HDFS Write: 1557723 HDFS EC Read: 0 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
[
I tested several other simple join scenarios against a view, and the results were similar. My initial take-away is that unless you need the base tables for other data, some of these filter and/or aggregate scenarios perform better using Apache Spark dataframe joins.
That's not a surprise, but if users need to have those base tables for other use cases, having Hadoop processes them is a viable alternative, just expect the execution difference. I have not completely optimized the Hadoop infrastructure, so I probably could have improved the exec times for MapReduce as well.
I had some time constraints on this article, so I never got around to Apache Drill or Apache Superset for a better user experience, but since Hive connections are available on both, you should be able to access the data. I am going to try a Drill vs. Superset showdown, where I can dedicate more time to compare.
The example I provided focused exclusively on Hive, but you can also use additional tools like Drill to access the HDFS file system and file contents, so JDBC and Hive are going after the structured data you have.. There are many query engines like that have connectors as well, so you can query the HDFS file system, parse JSON, XML, and delimited files, as well as using JDBC connections. Presto is a common one, as is Glue from AWS that can "open up" different data formats.
One last thing: The datalake and lakehouse models have loosely adopted a data quality control model for landing, scrubbing, transforming, and releasing data that has been used for many years in data warehouses. That workflow is really intended for data quality at many different levels, that usually is required by a lakehouse. Datalakes most commonly use the landing and "lite" scrubbing to get data in front of the users.
Update: 07/04/2025
Cloudflare has a beta service offering of Apache Iceberg available.