Apache Iceberg, Hadoop e Hive: Abra seu Datalake (Lakehouse) -> Parte II
- Claude Paugh
- 24 de jun.
- 7 min de leitura
Neste artigo, demonstrarei o acesso do usuário aos metadados do Hive e os mecanismos usados para criar conjuntos de resultados. Espero poder demonstrar como você pode abrir dados de datalake ou lakehouse para os usuários.

Preciso voltar atrás antes de começar com uma ressalva. Forneci um exemplo de inserção, bem como uma consulta no meu banco de dados Hive, no meu post anterior. Eu deveria ter mencionado que precisava fazer alguns ajustes de desempenho na minha instalação local do Hadoop e do Hive, devido à minha falta de recursos.
A gravação e a leitura estavam particularmente lentas localmente, então fiz algumas alterações. Abaixo estão a maioria das alterações que fiz para ajudar a melhorar o desempenho. Comentei nas linhas os parâmetros relevantes.
core-site.xml
<property>
<name>io.file.buffer.size</name>
<value>524288</value>
</property>
<property>
<name>fs.inmemory.size.mb</name>
<value>350</value>
</property>
<property>
<property>
<name>iceberg.engine.hive.enabled</name>
<value>true</value>
</property>
hdfs-site.xml
<property>
<name>dfs.block.size</name>
<value>134217728</value> # Increased from default
</property>
<property>
<name>dfs.namenode.handler.count</name>
<value>40</value> # Increased from default
</property>
<property>
<name>dfs.namenode.avoid.read.slow.datanode</name>
<value>true</value> # Added
</property>mapred-site.xml
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.map.output.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value> # Added enables parallel compression
</property>
<property>
<name>mapred.map.child.java.opts</name>
<value>-Xmx512M</value> # Reduced from 1GB default
</property>
<property>
<name>mapred.reduce.child.java.opts</name>
<value>-Xmx512M</value>. # Reduced from 1GB default
</property>
<property>
<name>mapreduce.map.cpu.vcores</name>
<value>2</value> # Larger, Default is 1
</property>
<property>
<name>yarn.app.mapreduce.am.command-opts</name>
<value>-Xmx3768m</value>
</property>yarn-site.xml
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>2</value> # New - matches the mapred allocation
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>localhost</value> # New, I have two ethernet interfaces, so avoiding wireless IP
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>512</value> # New, allocate fixed size for scheduler
</property>
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>true</value> # New, check physical memory available
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>true</value> # New, check virtual memory available
</property>
<property>
<name>yarn.nodemanager.disk-health-checker.min-healthy-disks</name>
<value>0.0</value> # Disk safety check
</property>
<property>
<name>yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage</name>
<value>100.0</value> # Disk safety check
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>16392</value> # Increased from default
<description>Physical memory, in MB, to be made available to running containers</description>
</property>Dando continuidade ao último post , comecei com uma interface bastante comum usada por repositórios de dados para acessar seus dados. Configurei uma conexão JDBC simples com meu banco de dados Hive e o datalake subjacente, como: jdbc:hive2:// localhost:10001/default .
Em seguida, consultei os dados que havia carregado via Spark anteriormente. Usei o JetBrains DataGrip novamente. Meu esquema do Hive ficou assim:

As tabelas são relacionadas a ações, e criei a tabela "company_forecasts_2024" com um trabalho do Apache Spark que uniu meus conjuntos de dados "company_stocks" e "forecasts" usando dataframes.
É um pouco diferente, pois tenho 10 arquivos de dados que compõem "previsões". O script PySpark que usei está abaixo. Adotei uma abordagem mais "de fábrica" para sincronizar o esquema com as tabelas do Hive, em vez de codificar como fazia antes.
O fluxo básico (diagrama abaixo) é ler os 10 arquivos de previsões, criando um dataframe. Em seguida, ler a definição atual da tabela "forecasts" no banco de dados Hive, já que eu queria manter minhas propriedades do Iceberg para armazenamento.
Em seguida, verifiquei o dataframe criado pela leitura do arquivo Spark em relação à tabela existente do Hive. Precisei alterar os tipos e a ordem das colunas para corresponder à tabela do Hive e salvar os dados por meio de um método de dataframe. Em seguida, anexei os dados lidos do arquivo à tabela no Hive.

Na próxima etapa, leio as informações atualizadas "forecasts" e "company_stocks" do Hive, executo uma filtragem de "inner join" para apenas 2024 dados. Em seguida, mesclo elementos específicos de cada tabela em um novo dataframe. A etapa final é gravar esse dataframe em uma nova tabela no Hive --> o tempo total foi de cerca de 5 segundos. As alterações no código e na configuração spark-defaults.co nf ; há alguma quebra de linha no arquivo conf.
Script PySpark
from datetime import datetime
from pyspark.sql.types import DoubleType, DateType, BooleanType, IntegerType, StringType, TimestampType
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import hive_metastore_client as hme
spark = (SparkSession
.builder
.appName("Python Spark SQL Hive integration example")
.master("local[*]")
.config("spark.jars", "/Volumes/ExtShield/opt/spark-3.5.6-bin-hadoop3/jars/iceberg-spark-runtime-3.5_2.12-1.9.1.jar")
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1")
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.local.type", "hive")
.config("spark.sql.catalog.local.warehouse", "hdfs://localhost:9000/user/hive/warehouse")
.config("spark.sql.defaultCatalog", "local")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalogImplementation", "hive")
.config("spark.hadoop.hive.metastore.uris", "thrift://localhost:9083")
.config("spark.sql.warehouse.dir", "hdfs://localhost:9000/user/hive/warehouse")
.config("spark.sql.warehouse.external.enabled", "true")
.enableHiveSupport()
.getOrCreate())
df = spark.read.csv('hdfs://localhost:9000/input_files/forecast-*.txt',
header=True, inferSchema=True, sep='|')
# spark is an existing SparkSession
hive_jdbc = {
"url": "jdbc:hive2://localhost:10001/default",
"driver": "org.apache.hive.jdbc.HiveDriver",
"table": "forecasts"
}
existing_df = spark.read.jdbc(url=hive_jdbc.get("url"), table=hive_jdbc.get("table"))
target_schema = existing_df.schema.names
target_columns = existing_df.schema.fieldNames()
target_columns = [c.replace("forecasts.", "") for c in target_columns]
target_table = hive_jdbc.get("table")
# Create a mapping between upper and lower case column names
column_mapping = {col_name: col_name for col_name in target_columns}
# First rename the columns to match case
for old_col in df.columns:
if old_col.lower() in column_mapping:
print(f"{old_col}, {old_col.lower()}")
df = df.withColumnRenamed(old_col, column_mapping[old_col.lower()])
print(df.printSchema())
# Reorder columns to match existing_df
df = df.select(target_columns)
print(df.count())
forecast_df = spark.read.table("default.forecasts")
company_stocks_df = spark.read.table("default.company_stocks")
jn_df = forecast_df.join(company_stocks_df, [forecast_df.ticker == company_stocks_df.stocksymbol, forecast_df.calendaryear==2024], how="inner")
selected_df = jn_df.select("ticker", "calendaryear", "exchange", "sector", "isin", "debt_growth_pct",
"dividend_per_share_growth", "ebitda_growth_pct", "eps_diluted_growth_pct", "eps_growth_pct", "cash_per_share", "revenue_growth_pct", "net_income_growth_pct")
selected_df.write.saveAsTable("default.company_forecasts_2024", format="iceberg", mode="overwrite")
print(jn_df.count())
print(jn_df.show())spark-defaults.conf
spark.jars.packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.hive.metastore.uri thrift://localhost:9083
spark.sql.warehouse.dir hdfs://ilocalhost:9000/user/hive/warehouse
spark.sql.catalogImplementation hive
spark.sql.catalog.spark_catalog org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.spark_catalog.type hive
spark.sql.catalog.spark_catalog.warehouse hdfs://localhost:9000/user/spark/warehouse
spark.sql.catalog.local org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.local.type hive
spark.sql.catalog.local.warehouse /Volumes/ExtShield/opt/local_catalog
spark.sql.hive.metastore.version 2.3.10
spark.sql.hive.metastore.jars builtin
spark.sql.hive.metastore.jars.path $HIVE_HOME/lib/*
spark.sql.hive.metastore.sharedPrefixes org.postgresql
spark.sql.hive.hiveserver2.jdbc.url jdbc:hive2://localhost:10001/default
Acesso a dados para Datalake
Para consultar os dados por meio de uma ferramenta de terceiros, comecei com uma conexão JDBC básica com o Hive usando o DataGrip, que produziu a captura de tela do esquema acima. Após o carregamento de dados, criei visualizações que recuperaram subconjuntos de dados com base nos valores ISIN. A consulta básica é a mesma, mostrada abaixo, a diferença é que apenas os caracteres iniciais do valor ISIN correspondem ao código ISO2 do país com uma correspondência curinga.
select cs.asofdate, cs.stocksymbol, cs.isin, cs.companyname, cs.country, fc.estimatedate, fc.debt_growth_pct, fc.dividend_per_share_growth,
fc.eps_diluted_growth_pct, fc.five_year_net_income_growth_per_share, fc.five_year_dividend_per_share_growth, fc.five_year_operating_cashflow_growth_per_share,
fc.five_year_revenue_growth_per_share, fc.five_year_shareholders_equity_growth_per_share
from company_stocks cs left join forecasts fc on (cs.stocksymbol = fc.ticker and fc.calendaryear = '2024') where cs.isin like 'US%';Posteriormente, criei diferentes visualizações filtrando os valores ISIN e usei uma delas para comparar a execução de uma instrução SQL de junção com uma simples seleção na visualização. Abaixo estão os resultados e, em resumo, são praticamente os mesmos. Ambas usando MapReduce via Hadoop e gastando aproximadamente o mesmo tempo.
SELECIONE do Hive com JDBC
SELECIONE usando Join
select cs.asofdate, cs.stocksymbol, cs.isin, cs.companyname, cs.country, fc.estimatedate, fc.debt_growth_pct, fc.dividend_per_share_growth, fc.eps_diluted_growth_pct, fc.five_year_net_income_growth_per_share, fc.five_year_dividend_per_share_growth, fc.five_year_operating_cashflow_growth_per_share, fc.five_year_revenue_growth_per_share, fc.five_year_shareholders_equity_growth_per_share from company_stocks cs left join forecasts fc on (cs.stocksymbol = fc.ticker and fc.calendaryear = '2024') where cs.isin like 'US%' [2025-06-20 18:07:59] 500 rows retrieved starting from 1 in 17 s 530 ms (execution: 17 s 262 ms, fetching: 268 ms) -------------------------------------------------------------------------------------------------- Query ID = claude_paugh_20250620180742_2bf50365-811c-4e27-b61e-7d7181339c9c Total jobs = 2 Stage-5 is filtered out by condition resolver. Stage-1 is selected by condition resolver. Launching Job 1 out of 2 Number of reduce tasks not specified. Estimated from input data size: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> Starting Job = job_1750435886619_0016, Tracking URL = http://localhost:8088/proxy/application_1750435886619_0016/ Kill Command = /opt/homebrew/Cellar/hadoop/3.4.1/libexec/bin/mapred job -kill job_1750435886619_0016 Hadoop job information for Stage-1:
number of mappers: 2; number of reducers: 1
2025-06-20 18:07:47,945 Stage-1 map = 0%, reduce = 0%
2025-06-20 18:07:53,120 Stage-1 map = 50%, reduce = 0%
2025-06-20 18:07:57,237 Stage-1 map = 100%, reduce = 0%
2025-06-20 18:07:58,268 Stage-1 map = 100%, reduce = 100%
Ended Job = job_1750435886619_0016
MapReduce Jobs Launched: Stage-Stage-1: Map: 2 Reduce: 1
HDFS Read: 22020529 HDFS Write: 1557723 HDFS EC Read: 0 SUCCESS Total MapReduce CPU Time Spent: 0 msec SELECIONE usando Exibir
default> select * from default.vw_forecasts_us_2024
2025-06-20 18:12:49] 500 rows retrieved starting from 1 in 18 s 546 ms (execution: 18 s 281 ms, fetching: 265 ms)
----------------------------------------------------------------------------------------------------
Query ID = claude_paugh_20250620181230_1794bd84-e171-48ed-9915-a95ea5717a21
Total jobs = 2
Stage-5 is filtered out by condition resolver.
Stage-1 is selected by condition resolver.
Launching Job 1 out of 2
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1750435886619_0018, Tracking URL = http://localhost:8088/proxy/application_1750435886619_0018/
Kill Command = /opt/homebrew/Cellar/hadoop/3.4.1/libexec/bin/mapred job -kill job_1750435886619_0018
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1
2025-06-20 18:12:36,547 Stage-1 map = 0%, reduce = 0%
2025-06-20 18:12:42,767 Stage-1 map = 50%, reduce = 0%
2025-06-20 18:12:46,875 Stage-1 map = 100%, reduce = 0%
2025-06-20 18:12:47,913 Stage-1 map = 100%, reduce = 100%
Ended Job = job_1750435886619_0018
MapReduce Jobs Launched:
Stage-Stage-1: Map: 2 Reduce: 1 HDFS Read: 22021017 HDFS Write: 1557723 HDFS EC Read: 0 SUCCESS
Total MapReduce CPU Time Spent: 0 msec[
Testei vários outros cenários de junção simples em uma visualização e os resultados foram semelhantes. Minha conclusão inicial é que, a menos que você precise das tabelas base para outros dados, alguns desses cenários de filtro e/ou agregação apresentam melhor desempenho usando junções de dataframe do Apache Spark.
Isso não é nenhuma surpresa, mas se os usuários precisarem dessas tabelas base para outros casos de uso, ter o Hadoop processando-as é uma alternativa viável, mas espere a diferença na execução. Não otimizei completamente a infraestrutura do Hadoop, então provavelmente poderia ter melhorado os tempos de execução do MapReduce também.
Tive algumas restrições de tempo para escrever este artigo, então nunca usei o Apache Drill ou o Apache Superset para uma melhor experiência do usuário, mas como as conexões Hive estão disponíveis em ambos, você deve conseguir acessar os dados. Vou tentar um confronto entre Drill e Superset, onde posso dedicar mais tempo à comparação.
O exemplo que forneci focou exclusivamente no Hive, mas você também pode usar ferramentas adicionais como o Drill para acessar o sistema de arquivos HDFS e o conteúdo dos arquivos, de modo que o JDBC e o Hive busquem os dados estruturados que você possui. Existem muitos mecanismos de consulta como esse que também possuem conectores, permitindo que você consulte o sistema de arquivos HDFS, analise JSON, XML e arquivos delimitados, além de usar conexões JDBC. O Presto é um exemplo comum, assim como o Glue da AWS, que pode "abrir" diferentes formatos de dados.
Uma última observação: os modelos de datalake e lakehouse adotaram, de forma mais ampla, um modelo de controle de qualidade de dados para aterrissagem, depuração, transformação e liberação de dados, que tem sido usado há muitos anos em data warehouses. Esse fluxo de trabalho visa, na verdade, a qualidade dos dados em diversos níveis, o que geralmente é exigido por um lakehouse. Os datalakes costumam usar a aterrissagem e a depuração "leve" para disponibilizar os dados aos usuários.

