top of page

Apache Iceberg、Hadoop 和 Hive:打开你的数据湖(Lakehouse)-> 第一部分

已更新:8月18日

在之前的一篇文章中,我简要概述了数据湖 (Data Lake) 和湖屋 (Lake House) 的区别标准。数据管理和组织是湖屋的关键要素,而数据湖的不足之处以及更高的数据输入速度是其主要特征。现在,我将探讨如何让数据湖或湖屋这两种模式都可供用户访问。我将把这个主题分为两部分,第一部分侧重于基础设施,第二部分则关注数据内容和访问。
数据湖和 Hive
Datalake and Hive

尽管存在差异,但在部署基础架构时,您可以向 Lakehouse 或 Datalake 架构迈出一些重大步伐。显然,它不处理构成数据管理实践的任何其他流程,但您可以自行定位,朝着这个方向发展。


我之前关于Apache Iceberg 的文章指出了 Iceberg 的数据管理功能,尤其是在模式管理方面。因此,我利用 Iceberg 构建了我的迷你模型架构。


对于我的数据处理平台,我坚持使用开源。但我认为也没有更好的商业产品。平台决策有些复杂。我使用并部署了DASKApache Spark作为能够在 CPU(由于 $$$ 而非常受欢迎)和 GPU 引擎上并行处理的处理引擎。显然,需要使用 GPU 供应商的库来扩展其中任何一个,但数据处理有点浪费 GPU——它们不适合这项任务。


我决定使用Apache Spark ,因为它拥有庞大的社区和供应商支持。当节点数量只有个位数时,它的部署和运维速度非常快——除非你使用能够快速扩展的云服务提供商。Spark 也不需要编写 Python 函数来实现并行性。由于我使用的是 Iceberg(和 Hadoop),它内置了与 Spark 的集成,这应该会让集成更加顺畅(我认为)。


最后,为了方便用户查询数据,我选择了 Apache Hive 和 Drill,因为它们集成良好。当然,还有很多其他开源工具和商业产品,可能更适合你,具体取决于你的具体需求,但这款产品可以让你无需支付许可费用即可上手。


以下是这些选择的描述,其中箭头代表数据流(理想情况下):

预期数据流
Expected Data Flow

数据湖:设置 Apache Hadoop 和 Apache Iceberg

Hadoop 需要安装 JDK v8 或更高版本。我使用Homebrew安装了openjdk@17 ,因为这个版本支持我上面列出的所有 Apache 产品。


根据您使用的平台,您可以选择通过下载手动安装,或使用包管理器。我还使用了Homebrew来安装 Hadoop:

brew install hadoop
cd "/opt/homebrew/Cellar/hadoop/3.3.6/libexec/etc/hadoop"

vi core-site.xml 

我在之前关于该主题的文章中已经完成了 Iceberg 安装,但基本上你的core-site.xml 文件应该类似于下面列表中的内容。我已将需要特别注意的项目加粗,并且由于我使用的是本地设置,因此我允许匿名身份验证。在大多数情况下身份验证应该启用/必需。主要的先决条件是拥有安装Hadoop 所需的文件系统位置和可用空间。

核心站点.xml

<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

<configuration>
<property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
</property>
<property>
    <name>hadoop.tmp.dir</name>
    <value>/Volumes/ExtShield/opt/hdfs/tmp</value>
    <description>A base for other temporary directories</description>             
  </property>
<property>
      <name>hadoop.http.filter.initializers</name>
      <value>org.apache.hadoop.security.AuthenticationFilterInitializer</value>
      <description>
                    Authentication for Hadoop HTTP web-consoles
                    add to this property the org.apache.hadoop.security.AuthenticationFilterInitializer initializer class.
      </description>
</property>
<property>
      <name>hadoop.http.authentication.type</name>
      <value>simple</value>
      <description>
                    Defines authentication used for the HTTP web-consoles. 
                    The supported values are: simple | kerberos | #AUTHENTICATION_HANDLER_CLASSNAME#. 
                    The dfeault value is simple.
      </description>
</property>
<property>
      <name>hadoop.http.authentication.token.validity</name>
      <value>36000</value>
      <description>
                    Indicates how long (in seconds) an authentication token is valid before it has to be renewed. 
                    The default value is 36000.
      </description>
</property>
<property>
      <name>hadoop.http.authentication.standard.config.path</name>
      <value>/Users/claude_paugh/hadoop/auth.conf</value>
      <description>
                    The signature secret file for signing the authentication tokens. 
                    The same secret should be used for all nodes in the cluster, JobTracker, NameNode, DataNode and TastTracker. 
                    The default value is $user.home/hadoop-http-auth-signature-secret. 
                    IMPORTANT: This file should be readable only by the Unix user running the daemons.
      </description>
</property>
<property>
      <name>hadoop.http.authentication.simple.anonymous.allowed</name>
      <value>true</value>
      <description>
                     Indicates if anonymous requests are allowed when using ‘simple’ authentication. 
                     The default value is true
      </description>
</property>
<property>
      <name>hadoop.http.authentication.signature.secret.file</name>
      <value>/Users/claude_paugh/hadoop/hadoop-http-auth-signature-secret</value>
      <description>
                    The signature secret file for signing the authentication tokens. 
                    The same secret should be used for all nodes in the cluster, JobTracker, NameNode, DataNode and TastTracker. 
                    The default value is $user.home/hadoop-http-auth-signature-secret. 
                    IMPORTANT: This file should be readable only by the Unix user running the daemons.
      </description>
</property>
<property>
	<name>iceberg.engine.hive.enabled</name>
	<value>true</value>
</property>
</configuration>

hdfs-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

<configuration>
  <property>
     <name>dfs.replication</name>
     <value>1</value>
  </property>
  <property>
     <name>dfs.permissions</name>
     <value>false</value>
  </property>
  <property>
     <name>dfs.namenode.name.dir</name>
     <value>file:/Volumes/ExtShield/opt/hdfs/namenode/data</value>
  </property>
    <property>
     <name>dfs.namenode.checkpoint.dir</name>
     <value>file:/Volumes/ExtShield/opt/hdfs/namenode/checkpoint</value>
  </property>
  <property>
     <name>dfs.datanode.data.dir</name>
     <value>file:/Volumes/ExtShield/opt/hdfs/datanode/data</value>
  </property>
</configuration>

mapred-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

<configuration>
 <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
    <property>
        <name>mapreduce.application.classpath</name>
    <value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value>
    </property>
</configuration>

纱线站点..xml

<?xml version="1.0"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->
<configuration>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    <property>
        <name>yarn.nodemanager.env-whitelist</name>
        <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_HOME,PATH,LANG,TZ,HADOOP_MAPRED_HOME</value>
    </property>
</configuration>

上面的配置文件列表也反映了我在 Apache Iceberg 中使用的配置。接下来,我在主目录中设置了.bashrc脚本,以包含 Hadoop 的变量。如果.bashrc在打开 shell 会话时未运行,则需要“ source ~/.bashrc ”手动运行它。包含的变量是运行(启动/关闭、作业)Iceberg/Hadoop、Hive 和 Spark 产品所必需的:

.bashrc——仅限 Hadoop

export PYTHON_BIN="/usr/local/bin"
export JAVA_HOME="/opt/homebrew/opt/openjdk@21"
export PYTHONPATH=$PYTHON_BIN
eval "$(/opt/homebrew/bin/brew shellenv)"
export PYTHONPATH=$PYTHONPATH:/Users/claude_paugh/airflow/arf/site-packages/trading-0.1.0b0.dist-info

export HDFS_NAMENODE_USER="claude_paugh"
export HDFS_DATANODE_USER="claude_paugh"
export HDFS_SECONDARYNAMENODE_USER="claude_paugh"
export HDF5_USE_FILE_LOCKING="FALSE"

export HADOOP_HEAPSIZE=2048
export HADOOP_HOME="/opt/homebrew/Cellar/hadoop/3.4.1/libexec"
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export HADOOP_CLIENT_OPTS="-Di-Dorg.apache.logging.log4j.simplelog.StatusLogger.level=TRACE"
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib"
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar

export PATH=$PATH:$PYTHONPATH:$JAVA_HOME:$HADOOP_HOME:$HADOOP_HOME/bin

您可以使用位于同一位置的$HADOOP_HOME/sbin/start-all.shstop-all.sh来启动和关闭 Hadoop。如果您像我一样使用 Python 库,则需要通过 pip 或 conda 安装 pyiceberg 包。此时,如果“ start-all.sh ”运行正常,请在浏览器中访问http://localhost:9870/网址,您应该会看到类似以下内容:

Hadoop主页用户界面
Hadoop UI

我之前已经下载Apache Iceberg 的 JAR,因此此时我可以将 Iceberg jar 添加到我的类路径(或$HADOOP_HOME/lib ),并且我应该可以完全使用 Iceberg + Hadoop 功能。


设置 Apache Spark

我的下一步是设置 Spark 独立版本。我决定尝试 4.0.0 版本,并再次使用Homebrew 进行安装。它与我的 Hadoop/Iceberg 版本配合良好,但在安装 Apache Hive 4.0.1 时,我开始遇到一些与目录识别相关的间歇性问题,这些问题在其他人的版本中也出现过。因此,我改用 Spark 3.5.6,并再次使用 Homebrew。与上面的 Hadoop 安装类似, 启动all.sh 并在$SPARK_HOME/sbin目录中运行 stop-all.sh 。启动后,您应该能够访问http://127.0.0.1:8080/并看到以下内容:

Spark UI
Spark Standalone UI

我还创建并定制了$SPARK_HOME/conf/ spark-defaults.co nf ,它在 Spark 启动时读取,它包含以下内容:

spark.jars.packages                                             org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1
spark.sql.extensions                                           org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.hive.metastore.uri                                    thrift://localhost:9083
spark.sql.warehouse.dir                                         hdfs://localhost:9000/user/hive/warehouse
spark.sql.catalogImplementation                                 hive
spark.sql.catalog.spark_catalog                                 org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.spark_catalog.type                            hive
spark.sql.catalog.spark_catalog.warehouse                       hdfs://localhost:9000/user/spark/warehouse

这些值指向的是 Hive 的使用情况和集成情况。我还创建了(从模板)并修改了spark-env.sh $SPARK_HOME/conf目录中的脚本包含以下内容:

export SPARK_MASTER_HOST=127.0.0.1
export SPARK_MASTER_PORT=7077
 
# Options read in any cluster manager using HDFS
export HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop"
export HADOOP_HOME=$HADOOP_HOME

# Pyspark
PYSPARK_DRIVER_PYTHON=python
PYSPARK_PYTHON=python

此时,我已经拥有了一个可以正常运行的 Spark 环境,我测试了 Scala 和 Pyspark shell 能否从命令行启动,结果成功了,并且我运行了一些语句。然后,我为 Pyspark 执行了 pip 安装,因为我不打算在测试时使用 shell。我通过运行一个简单的脚本来测试 Spark 安装的其余部分,该脚本打开一个 SparkSession,并从 HDFS 位置将一个带分隔符的文件读入数据帧,并将 Pyspark 导入到脚本中。我还在.bashrc文件中添加了以下变量:

export SPARK_REMOTE="sc://localhost"
export SPARK_HOME="/Volumes/ExtShield/opt/spark-3.5.6-bin-hadoop3/libexec"
export PYTHONPATH=$HADDOOP_HOME:$SPARK_HOME/jars:$PYTHONP$
export PATH=$PATH:$HADOOP_HOME:$SPARK_HOME:$HADOOP_HDFS_HOME

设置 Apache Hive

正如我之前所说,我最初使用的是 Apache Spark 4.0.0 和 Apache Hive 4.0.1,但遇到一些问题后,我将 Spark 降级到 3.5.6,并安装了 Apache Hive 4.0.0 版本。您可以在这里找到 Hive 的历史版本。我下载了apache-hive-4.0.0-bin.tar.gz文件并将其移动到目标位置,然后在 MacBook 上的 shell 中运行:

gunzip apache-hive-4.0.0-bin.tar.gz
tar -xvf apache-hive-4.0.0-bin.tar 

然后,我将以下环境变量添加到我的.bashrc中:

export HIVE_OPTS="-hiveconf mapreduce.map.memory.mb=8192 -hiveconf mapreduce.reduce.memory.mb=5120"
export HIVE_HOME="/Volumes/ExtShield/opt/apache-hive-4.0.0-bin"
export HIVE_CONF_DIR=$HIVE_HOME/conf
export HIVE_SERVER2_THRIFT_BIND_HOST="localhost"
export HIVE_SERVER2_THRIFT_PORT="10001"
export PYTHONPATH --> add $HIVE_HOME/lib
export PATH --> add $HIVE_HOME

内存限制是为了避免“内存不足”错误,其余部分的含义不言自明,具体含义请参考 Hive 文档。 “HIVE_SERVER2”变量是即将进行的安装所必需的。


我之前曾使用远程 PostgreSQL 数据库来构建 Iceberg 目录,因此我也沿用了同样的方法。我重新使用部署中安装的 Postgres,并将Postgres JDBC 驱动程序复制到$HIVE_HOME/lib位置,以便将其添加到 Hive 类路径中。


我首先阅读了最新的 Hive文档和 AdminManual,然后下载了 Hive 4.0.0 版本进行 tarball 安装。我按照说明操作,并根据提示修改了.bashrc 文件和配置文件,然后开始安装Hive Metastore


正如我上面指出的,我打算使用带有 PostgreSQL 的远程 Metastore,因此我使用DataGrip通过 JDBC 连接手动创建了 Postgres 数据库,并将其命名为metacatalog 然后,根据提供的模板文件创建了hive-site.xml文件,并开始更新配置,用 Postgres 替换了 MySQL(详见网站文档)。您还会注意到,我添加了一些用于设置HiveServer2 的参数,详情如下:

配置参数

配置值

评论

javax.jdo.option.ConnectionURL

jdbc:postgresql://10.0.0.30:5439/metacatalog?ssl=false;create=false

元数据存储在 PostgreSQL 服务器中

javax.jdo.option.ConnectionDriverName

JDBC 驱动类

javax.jdo.option.ConnectionUserName

<username>

连接 Postgres 的用户名

javax.jdo.option.ConnectionPassword

<password>

连接 Postgres 的密码

hive.metastore.warehouse.dir

Hive 表的默认位置 (Hadoop)

要绑定 Metastore 服务的主机名。如果为空,则使用“localhost”。此配置适用于 Hive 4.0.0 及以上版本。

hive.metastore.uris

Thrift Metastore 服务器的主机和端口。如果指定了hive.metastore.thrift.bind.host ,则主机应与该配置相同。有关更多信息,请参阅动态服务发现配置参数。

我相信大家都会注意到,在大多数安全设置中,将用户名和密码以明文形式存储在hive-site.xml文件中是不可行的。我预计大多数组织都会使用SAML 2.0和单点登录。


接下来,我在配置更改后在后台将 Hive Metastore 作为服务启动。

$HIVE_HOME/bin/hive --service metastore &

我最初尝试在 JDBC URL 上使用“ create=true ”配置选项来启动元存储,但只取得了部分成功。因此,我改为使用“ create=false ”配置选项,并使用schema-tool执行手动部署。我成功启动了服务,并使用该工具部署了 Hive 目录表:

Postgres Hive 元目录
Postgres Hive Meta-catalog

在 Hive 4.0.0 附带的 Catalog 版本中,假设数据库启动时为空,那么数据库中应该有 83 个表。在使用 schema 工具修改参数并启动 Metastore 服务后,启动日志中显示一切正常,因此我认为接下来应该进行 Hive 服务器的部署。


我已经在上面添加了部分参数 HiveServer2,但为了完成任务,我还需要对hive-site.xml做一些修改。首先,我想以“HTTP”模式运行,而不是二进制模式,因此我将xml 中的hive.server2.transport.mode更新为“ http ”。然后,我在 hive-site.xml 文件中添加了最低要求值,包括在“http 模式”下运行的值(表格见链接),并执行了以下操作:

  1. 禁用 SSL

  2. 使用带有 http 参数的 JDBC URL: jdbc:hive2:// localhost:10001 / default; transportMode = http; httpPath = cliservice

  3. 日志配置

  4. 匿名身份验证

  5. Hive 的临时目录配置


然后我启动了服务:

$HIVE_HOME/bin/hive --service hiveserver2

访问http://localhost:10002 ,我看到以下内容


配置单元服务器2
HiveServer2 Home UI

此时,我似乎至少在运行,因此我安装并打开了DBeaver来验证 Hive 的功能。我使用 DBeaver 创建了与 Hive 的 JDBC 连接,并添加了一个无需身份验证的 Hive 数据源,使用的 URL 为上面提供的:

Hive JDBC Connection
Hive JDBC Connection

我连接并看到以下内容:

Connected to Hive
Connected to Hive

打开 HiveServer2 的 URL:

Hive UI Output
Hive UI Output

我已经验证了与 Hive 数据库的连接,接下来我想创建一个表来验证 Hive + Iceberg + Hadoop 的集成。我找到了在Apache Iceberg 文章中使用的“创建表”语句并做了一些简单的修改,使其包含 Hive/Iceberg 的行格式 serde:


运行脚本:

创建表
Created Table
Hive 目录表
Hive Catalog of Table





















Hive UI 输出:

单击“Drilldown”链接可以获取语句运行的执行计划和操作日志:

创建表的 Hive UI 输出
Hive UI Output for Create Table

包含 Apache Iceberg 元数据文件夹的 Hadoop 文件夹:

表的 Hadoop 文件夹
Hadoop Folder for table
Hadoop 中的 Iceberg metadata.json 文件
Iceberg metadata.json file in Hadoop

接下来,我还在 Pycharm IDE 中添加了 Hive 和 Hadoop 的连接,这样在编写 Pyspark 脚本时就可以浏览并查看表的扩展属性。如果您运行的是旧版本的 IDE,则需要添加 Spark 插件;如果您使用的是“专业版”许可证,新版本会自带该插件。


打开“大数据工具”创建连接:

PyCharm大数据工具
PyCharm Big Data Tools









点击左侧的“ Hive Metastore ”按钮图标,即可查看表规格的详细信息。点击按钮后,屏幕右侧会出现面板,显示表属性信息(第 1 页 + 第 2 页):

Hive 元存储
Hive Metastore
表格属性第 1 页
Table Attribute Page 1















表格属性第 2 页
Table Attributes Page 2

然后,我使用 Spark 加载了我在Iceberg 系列文章中使用过的数据文件,并在“test_company_stocks”表中创建了记录。我将在本文的第二部分介绍如何使用 Spark,但加载完成后,我可以看到数据已持久化。我将该文件加载到一个单独的暂存表中,然后在 IDE 中执行了 SQL 操作:“ INSERT INTO.....SELECT * FROM staging_table” ,以便演示发生了什么。


在 DBeaver 中运行“ INSERT ”命令后,您可以立即转到 HiveServer2 控制台,单击查询,然后转到“操作日志”(假设您已像我之前一样进行了配置)。您应该看到如下输出:

SQL查询操作日志
Operation Log for SQL Query

如果你将高亮显示的 URL 复制/粘贴到浏览器标签页中,你应该会看到“Hadoop 应用程序”窗口。在这里,你可以监视 Hadoop 中将数据从一个表“转换”到另一个表的“MapReduce”作业:

Hadoop 应用程序窗口
Hadoop Application Window

如果您点击左侧菜单中的“工具 -> 日志”,则可以在日志文件中查看作业的进度。如果您真的感兴趣,可以将“tmp”或“scratchdir”设置为 Hive 的特定位置,然后可以看到操作系统文件系统或 Hadoop 文件系统中的临时对象(如果您为暂存文件选择了此选项)。


完成后,我回到 DBeaver,运行“select * from test_company_stock”,我可以看到以下内容:

来自 Hive 的结果集
Result Set from Hive

由于使用的是 Iceberg,Hadoop 中的“test_company_stocks”文件夹现在应该有一个名为“data”的子文件夹,其中包含分区和 parquet 文件。在我的例子中确实如此,如下所示:


带分区的数据目录
Data Directory with Partitions
分区内的 Parquet 数据文件
Parquet Data File within Partition

至此,我对我的基础设施(除了 Apache Drill)感到满意,它已经可以开始数据处理了,而且 Spark + Hive + Iceberg/Hadoop 也已经通过了验证。下一篇文章将重点介绍一些数据处理场景,以及如何将 Apache Drill 与 Hive Catalog 结合使用。


对于那些好奇的人来说,下面是我用来用 Spark 加载数据的脚本;包含一些额外的步骤来演示一些要点。

from pyspark.sql.types import DoubleType, DateType, BooleanType, IntegerType, StringType, TimestampType
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import hive_metastore_client as hme

spark = (SparkSession
         .builder
         .appName("Python Spark SQL Hive integration example")
		 .master("local[*]")
         .config("spark.jars", "/Volumes/ExtShield/opt/spark-3.5.6-bin-hadoop3/jars/iceberg-spark-runtime-3.5_2.12-1.9.1.jar")
         .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1")
         .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
         .config("spark.sql.catalog.local.type", "hive")
         .config("spark.sql.catalog.local.warehouse", "hdfs://localhost:9000/user/hive/warehouse")
         .config("spark.sql.defaultCatalog", "local")
         .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
		 .config("spark.sql.catalogImplementation", "hive")
         .config("spark.hadoop.hive.metastore.uris", "thrift://localhost:9083")
         .config("spark.sql.warehouse.dir", "hdfs://localhost:9000/user/hive/warehouse")
         .config("spark.sql.warehouse.external.enabled", "true")
         .enableHiveSupport()
         .getOrCreate())

# spark is an existing SparkSession
hive_jdbc = {
	"url": "jdbc:hive2://localhost:10001/default;transportMode=http;httpPath=cliservice",
	"driver": "org.apache.hive.jdbc.HiveDriver",
	"table": "company_stocks"
}

# Read from Hive using JDBC
dfh = spark.read.jdbc(url=hive_jdbc.get("url"), table=hive_jdbc.get("table"))
print(dfh.schema.fieldNames())

df = spark.read.csv('hdfs://localhost:9000/input_files/company-reference.txt',
                                   header=True, inferSchema=True, sep='|')

# Fixing dataframe types
df_new = df.withColumn("AsOfDate", col("AsOfDate").cast(TimestampType())) \
  	.withColumn("ExchangeShortname", col("ExchangeShortName").cast(StringType())) \
  	.withColumn("Employees", col("Employees").cast(IntegerType())) \
  	.withColumn("ADR", col('ADR').cast(BooleanType())) \
  	.withColumn("isFund", col('isFund').cast(BooleanType())) \
  	.withColumn("isETF", col('isETF').cast(BooleanType())) \
  	.withColumn("AverageVolume", col("AverageVolume").cast(DoubleType())) \
  	.withColumn("Beta", col("Beta").cast(DoubleType())) \
  	.withColumn("Discounted_Cash_Flow", col("Discounted_Cash_Flow").cast(DoubleType())) \
  	.withColumn("Discounted_Cash_Flow_Diff", col("Discounted_Cash_Flow_Diff").cast(DoubleType())) \
  	.withColumn("IPO_Date", col("IPO_Date").cast(DateType()))
print(df.printSchema())

# Fixing column order
df_new = df_new.select(col("AsOfDate"), col("CompanyName"), col("Ticker"), col("CIK"), col("ISIN"), col("CUSIP"),
                         col("Description"),col("Exchange"), col("ExchangeShortName"), col("Sector"), col("Country"),
                         col("Currency"), col("City"), col("State"), col("Address"), col("Employees"), col("CEO"),
                         col("Website"), col("ZipCode"), col("ADR"), col("isFund"), col("isETF"), col("AverageVolume"),
                         col("Beta"), col("Discounted_Cash_Flow"), col("Discounted_Cash_Flow_Diff"), col("IPO_Date"),
                         col("Industry"))

df_new.createOrReplaceTempView("vw_raw_company_stocks")
# Save using dataframe API
df_new.write.mode("append").insertInto("default.raw_company_stocks")

# Save using Spark SQL
spark.sql("insert into default.company_stocks "
             "select AsOfDate, CompanyName, StockSymbol, CIK, ISIN, CUSIP, Description, `Exchange`, 	  `ExchangeShortName`, Sector, Country, Currency, City, State, Address, Employees, CEO, Website, ZipCode, ADR, isFund, isETF, AverageVolume, Beta, Discounted_Cash_Flow, Discounted_Cash_Flow_Diff, IPO_Date, Industry from default.raw_company_stocks")

print("Company Stocks")
spark.sql("select * from default.company_stocks").show()

# Connect to Hive using Metastore Client & Thrift
hive_con = hme.HiveMetastoreClient(host="localhost", port=9083)
hive_client = hme.HiveMetastoreClient.open(hive_con)
hme_df = hive_client.get_table(dbname="default", tbl_name="raw_company_stocks")
print(hme_df)


bottom of page