top of page

Apache Iceberg, Hadoop, & Hive: Open your Datalake (Lakehouse) -> Part I

  • 執筆者の写真: Claude Paugh
    Claude Paugh
  • 6月16日
  • 読了時間: 13分

更新日:6月24日

In a previous post, I did a short summary of what distinguishing criteria constituted a datalake and lakehouse. Data management and organization was the key take-away as what made a lakehouse, and the lack of points toward a datalake, in addition to higher velocity of data inputs. Now I have moved on to how you can make either of those patterns, datalake or lakhouse, usable for users to access. I am going to break this subject into two parts, the first focusing on infrastructure, and the second will be data content and access.
Data lake and Hive
Datalake and Hive

Despite the differences, you can take some large steps towards a lakehouse or datalake architecture when deploying your infrastructure. Obviously it does not handle any of the additional processes that make up a data management practice, but you can position yourself to take that direction.


My previous posts on Apache Iceberg pointed out the data management features of Iceberg, especially in schema management. So I started my mini-model architecture by leveraging Iceberg.


For my data processing platform I am sticking with open source. But I don't think there are better commercial products either. The platform decision was somewhat more complex. I have used and deployed both DASK and Apache Spark as processing engines capable of parallel processing on CPU(very highly preferred due to $$$) and GPU engines. Obviously it's required to get GPU vendor libraries to extended either of them, but data processing is a bit of a waste of GPU's - they are not suited for the task.


I decided to go with Apache Spark, due to it's large community and vendor support. It's a very quick ramp-up time for deployment and operations, when nodes are in the single digits - unless you're using a Cloud provider service that can scale quickly. Spark also does not require coding Python functions to enable parallelism. Since I was leveraging Iceberg (and Hadoop), there is built-in integration with Spark which should make integration smoother (I think).


Finally in order to enable user query access to the data, I went with Apache Hive and Drill due to their integration. There are many other open source tools and commercial products that may make more sense, based on specific requirements you may have, but this will get you started without licensing costs.


The following is a depiction of those choices with arrows representing the data flows (ideally):

Expected Data Flow
Expected Data Flow

Datalake: Setup Apache Hadoop & Apache Iceberg

Installation of a JDK v8 or newer is required for Hadoop. I used homebrew to install openjdk@17, since it was the version that would support all of the Apache products I listed above.


Depending upon the platform you're using, you have a choice of manual installation via download, or using a package manager. I also used homebrew to install Hadoop:

brew install hadoop
cd "/opt/homebrew/Cellar/hadoop/3.3.6/libexec/etc/hadoop"

vi core-site.xml 

I had already done this as part of the Iceberg install from my post on that topic, but basically your core-site.xml should look something like one in the list below. I have bolded the items to pay particular attention to, and since I have a local setup I allowed anonymous authentication. In most cases authentication should be enabled/required. The main pre-requisite is to have file-system locations and space available to install Hadoop.

core-site.xml

<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

<configuration>
<property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
</property>
<property>
    <name>hadoop.tmp.dir</name>
    <value>/Volumes/ExtShield/opt/hdfs/tmp</value>
    <description>A base for other temporary directories</description>             
  </property>
<property>
      <name>hadoop.http.filter.initializers</name>
      <value>org.apache.hadoop.security.AuthenticationFilterInitializer</value>
      <description>
                    Authentication for Hadoop HTTP web-consoles
                    add to this property the org.apache.hadoop.security.AuthenticationFilterInitializer initializer class.
      </description>
</property>
<property>
      <name>hadoop.http.authentication.type</name>
      <value>simple</value>
      <description>
                    Defines authentication used for the HTTP web-consoles. 
                    The supported values are: simple | kerberos | #AUTHENTICATION_HANDLER_CLASSNAME#. 
                    The dfeault value is simple.
      </description>
</property>
<property>
      <name>hadoop.http.authentication.token.validity</name>
      <value>36000</value>
      <description>
                    Indicates how long (in seconds) an authentication token is valid before it has to be renewed. 
                    The default value is 36000.
      </description>
</property>
<property>
      <name>hadoop.http.authentication.standard.config.path</name>
      <value>/Users/claude_paugh/hadoop/auth.conf</value>
      <description>
                    The signature secret file for signing the authentication tokens. 
                    The same secret should be used for all nodes in the cluster, JobTracker, NameNode, DataNode and TastTracker. 
                    The default value is $user.home/hadoop-http-auth-signature-secret. 
                    IMPORTANT: This file should be readable only by the Unix user running the daemons.
      </description>
</property>
<property>
      <name>hadoop.http.authentication.simple.anonymous.allowed</name>
      <value>true</value>
      <description>
                     Indicates if anonymous requests are allowed when using ‘simple’ authentication. 
                     The default value is true
      </description>
</property>
<property>
      <name>hadoop.http.authentication.signature.secret.file</name>
      <value>/Users/claude_paugh/hadoop/hadoop-http-auth-signature-secret</value>
      <description>
                    The signature secret file for signing the authentication tokens. 
                    The same secret should be used for all nodes in the cluster, JobTracker, NameNode, DataNode and TastTracker. 
                    The default value is $user.home/hadoop-http-auth-signature-secret. 
                    IMPORTANT: This file should be readable only by the Unix user running the daemons.
      </description>
</property>
<property>
	<name>iceberg.engine.hive.enabled</name>
	<value>true</value>
</property>
</configuration>

hdfs-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

<configuration>
  <property>
     <name>dfs.replication</name>
     <value>1</value>
  </property>
  <property>
     <name>dfs.permissions</name>
     <value>false</value>
  </property>
  <property>
     <name>dfs.namenode.name.dir</name>
     <value>file:/Volumes/ExtShield/opt/hdfs/namenode/data</value>
  </property>
    <property>
     <name>dfs.namenode.checkpoint.dir</name>
     <value>file:/Volumes/ExtShield/opt/hdfs/namenode/checkpoint</value>
  </property>
  <property>
     <name>dfs.datanode.data.dir</name>
     <value>file:/Volumes/ExtShield/opt/hdfs/datanode/data</value>
  </property>
</configuration>

mapred-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

<configuration>
 <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
    <property>
        <name>mapreduce.application.classpath</name>
    <value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value>
    </property>
</configuration>

yarn-site..xml

<?xml version="1.0"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->
<configuration>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    <property>
        <name>yarn.nodemanager.env-whitelist</name>
        <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_HOME,PATH,LANG,TZ,HADOOP_MAPRED_HOME</value>
    </property>
</configuration>

The above list of configuration files, reflects the configuration that I have been using with Apache Iceberg as well. Next, I had set the .bashrc script in my home directory to include the variables for hadoop. The .bashrc if it does not run when your shell session opens, then you need to "source ~/.bashrc" to have it run manually. The variables included are required to operate (startup/shutdown, jobs) the Iceberg/Hadoop, Hive, and Spark products:

.bashrc -- Hadoop Only

export PYTHON_BIN="/usr/local/bin"
export JAVA_HOME="/opt/homebrew/opt/openjdk@21"
export PYTHONPATH=$PYTHON_BIN
eval "$(/opt/homebrew/bin/brew shellenv)"
export PYTHONPATH=$PYTHONPATH:/Users/claude_paugh/airflow/arf/site-packages/trading-0.1.0b0.dist-info

export HDFS_NAMENODE_USER="claude_paugh"
export HDFS_DATANODE_USER="claude_paugh"
export HDFS_SECONDARYNAMENODE_USER="claude_paugh"
export HDF5_USE_FILE_LOCKING="FALSE"

export HADOOP_HEAPSIZE=2048
export HADOOP_HOME="/opt/homebrew/Cellar/hadoop/3.4.1/libexec"
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export HADOOP_CLIENT_OPTS="-Di-Dorg.apache.logging.log4j.simplelog.StatusLogger.level=TRACE"
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib"
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar

export PATH=$PATH:$PYTHONPATH:$JAVA_HOME:$HADOOP_HOME:$HADOOP_HOME/bin

You can startup and shutdown Hadoop by using the $HADOOP_HOME/sbin/start-all.sh and stop-all.sh in the same location. If you're using the Python libraries like I am, you will need to install the pyiceberg package via pip or conda. At that point if "start-all.sh" runs correctly, go to the http://localhost:9870/ url in your browser, and you should see something like the below:

hadoop home ui
Hadoop UI

I had previously downloaded the JAR for Apache Iceberg, so at this point I can add the Iceberg jar to my classpath (or $HADOOP_HOME/lib), and I should be fully Iceberg + Hadoop functional.


Setup Apache Spark

My next step was to setup Spark standalone. I decided to give the 4.0.0 release a try, and again I used Homebrew to install it. It was working fine with my Hadoop/Iceberg installation, but upon installing Apache Hive 4.0.1, I started have intermittent issues that had been seen buy others concerning catalog recognition, so I went with Spark 3.5.6 instead, using Homebrew once more. Similar to the Hadoop install above, there are start-all.sh and stop-all.sh in the $SPARK_HOME/sbin directory. After starting up, you should be able to go to http://127.0.0.1:8080/ and see this:

spark ui
Spark Standalone UI

I also creating and customized my $SPARK_HOME/conf/spark-defaults.conf, which is read when Spark launches, and it contained the following:

spark.jars.packages                                             org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1
spark.sql.extensions                                           org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.hive.metastore.uri                                    thrift://localhost:9083
spark.sql.warehouse.dir                                         hdfs://localhost:9000/user/hive/warehouse
spark.sql.catalogImplementation                                 hive
spark.sql.catalog.spark_catalog                                 org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.spark_catalog.type                            hive
spark.sql.catalog.spark_catalog.warehouse                       hdfs://localhost:9000/user/spark/warehouse

The values are directed as Hive usage and integration. I also created (from template) and modified the spark-env.sh script in the $SPARK_HOME/conf directory to contain the following:

export SPARK_MASTER_HOST=127.0.0.1
export SPARK_MASTER_PORT=7077
 
# Options read in any cluster manager using HDFS
export HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop"
export HADOOP_HOME=$HADOOP_HOME

# Pyspark
PYSPARK_DRIVER_PYTHON=python
PYSPARK_PYTHON=python

At this point I had a functional Spark environment, and I tested whether or not the Scala and Pyspark shells would launch from the cli, which they did successfully, and I ran a few statements. I then did a pip install for Pyspark, since I did not plan to use the shell when testing. I tested the rest of my Spark install by running a simple script that opened a SparkSession, and read a delimited file into a dataframe from a HDFS location, with Pyspark imported into the script. I had also added the following variables to the .bashrc file:

export SPARK_REMOTE="sc://localhost"
export SPARK_HOME="/Volumes/ExtShield/opt/spark-3.5.6-bin-hadoop3/libexec"
export PYTHONPATH=$HADDOOP_HOME:$SPARK_HOME/jars:$PYTHONP$
export PATH=$PATH:$HADOOP_HOME:$SPARK_HOME:$HADOOP_HDFS_HOME

Setup Apache Hive

As I had previously stated, I had started with Apache Spark 4.0.0 and Apache Hive 4.0.1, but after some issues, I stepped down my Spark to 3.5.6 and installed the 4.0.0 version of Apache Hive as well. You can find the historical releases of Hive here. I downloaded the apache-hive-4.0.0-bin.tar.gz file and moved it to my target location, then in a shell on my MacBook ran:

gunzip apache-hive-4.0.0-bin.tar.gz
tar -xvf apache-hive-4.0.0-bin.tar 

Then, I added the following environment variables to my .bashrc:

export HIVE_OPTS="-hiveconf mapreduce.map.memory.mb=8192 -hiveconf mapreduce.reduce.memory.mb=5120"
export HIVE_HOME="/Volumes/ExtShield/opt/apache-hive-4.0.0-bin"
export HIVE_CONF_DIR=$HIVE_HOME/conf
export HIVE_SERVER2_THRIFT_BIND_HOST="localhost"
export HIVE_SERVER2_THRIFT_PORT="10001"
export PYTHONPATH --> add $HIVE_HOME/lib
export PATH --> add $HIVE_HOME

The memory limits are to avoid "out of memory" errors, the rest are self explanatory, based on reading the Hive documentation. The variables referencing "HIVE_SERVER2" are needed for that installation which is coming up.


I had previously used a remote PostgreSQL database for the Iceberg catalog, so I went down the same path with Hive. Re-using the Postgres install from the deployment, I copied the Postgres JDBC Driver to the $HIVE_HOME/lib location so it would be added to the Hive classpath.


I started with the most recent Hive documentation, and the AdminManual, then downloaded the 4.0.0 release of Hive for the tarball install. I went through the instructions, and made changes to my .bashrc and config as indicated, and moved on to the Hive Metastore installation.


As I indicated above, I was going to use a remote Metastore with PostgreSQL, so I manually created the Postgres database using a JDBC connection using DataGrip, and called it metacatalog. I then created a hive-site.xml from the template file provided, and started to update the config, substituting Postgres for MySQL(in website doc). You will also notice that I included some parameters used for setting up HiveServer2, details are below:

Config Param

Config Value

Comment

javax.jdo.option.ConnectionURL

jdbc:postgresql://10.0.0.30:5439/metacatalog?ssl=false;create=false

metadata is stored in a PostgreSQL server

javax.jdo.option.ConnectionDriverName

JDBC driver class

javax.jdo.option.ConnectionUserName

<username>

username for connecting to Postgres

javax.jdo.option.ConnectionPassword

<password>

password for connecting to Postgres

hive.metastore.warehouse.dir

default location for Hive tables (Hadoop)

Host name to bind the metastore service to. When empty, “localhost” is used. This configuration is available Hive 4.0.0 onwards.

hive.metastore.uris

host and port for the Thrift metastore server. If hive.metastore.thrift.bind.host is specified, host should be same as that configuration. Read more about this in dynamic service discovery configuration parameters.

I am sure everyone will take note that storing usernames and passwords in the hive-site.xml file in clear text is not ok in most security setups. I would expect most organizations will use SAML 2.0 and single sign-on.


Next, I started the hive metastore as a service in the background after the configuration changes.

$HIVE_HOME/bin/hive --service metastore &

I had originally tried to start the metastore with the configuration option of "create=true" on the JDBC URL, but I was only having partial success. So I moved on to "create=false" and using the schema-tool to perform a manual deployment. I was successful in starting the service and deploying the hive catalog tables with the tool:

postgres hive metacatalog
Postgres Hive Meta-catalog

In the release of the catalog that comes with Hive 4.0.0, you should have 83 tables in the database, assuming it was empty to start. After starting the metastore service with the changes to the parameters after using the schema tool, everything was clean in the startup logs, so I think moved on to the Hive server deployment.


I had already included some of the parameters above for HiveServer2, but I still needed to make a few more changes to hive-site.xml in order to complete the task. First I wanted to run "HTTP" mode instead of binary, so I updated the hive.server2.transport.model in the xml to "http". Then I added minimum required values to the hive-site.xml file, including the values for running in "http mode" (table in link), and also did the following:

  1. Disabling SSL

  2. Used JDBC URL with http parameters: jdbc:hive2://localhost:10001/default;transportMode=http;httpPath=cliservice

  3. Logging Configuration

  4. Anonymous Authentication

  5. scratch dir configuration for Hive


Then I started the service:

$HIVE_HOME/bin/hive --service hiveserver2

Visiting http://localhost:10002, I see the below


hiveserver2
HiveServer2 Home UI

At this point, it appeared I was running at least, so I installed and opened DBeaver in order to validate functionality of Hive. I created a JDBC connection to Hive using DBeaver adding a Hive datasource with no auth, using the url from above:

Hive JDBC Connection
Hive JDBC Connection

I connected and saw the following:

Connected to Hive
Connected to Hive

Opened URL for HiveServer2:

Hive UI Output
Hive UI Output

I had validated a connection to the Hive database, next I wanted to create a table to validate the integration of Hive + Iceberg + Hadoop. I sourced the "create table" statement that I had used in the Apache Iceberg posts, and made a simple modification to include the row format serde for Hive/Iceberg:


Ran the script:

created table
Created Table
hive catalog table
Hive Catalog of Table





















Hive UI Output:

clicking on the Drilldown link gets you execution plans and operating logs for the statement run:

Hive UI Output for Create Table
Hive UI Output for Create Table

Hadoop folder containing metadata folder for Apache Iceberg:

Hadoop Folder for table
Hadoop Folder for table
Iceberg metadata.json file in Hadoop
Iceberg metadata.json file in Hadoop

Next, I also added a connection to Hive and Hadoop to my Pycharm IDE, so I can browse and see extended attributes for tables when writing Pyspark scripts. You need to add the Spark plugin if you're running an older version of the IDE; new versions come with it installed if you're using a "Pro" license.


Open "Big Data Tools" to create the connections:

PyCharm Big Data Tools
PyCharm Big Data Tools









This gives me a detailed view of the table specifications by clicking on the "Hive Metastore" button icon(left). Panels should appear on the right of the screen after the button click, that present information on table attributes (Page 1 +2):

hive metastore
Hive Metastore
table attributes page 1
Table Attribute Page 1















Table Attributes Page 2
Table Attributes Page 2

I then loaded a datafile that I had used in the Iceberg article series, using Spark, and created records in the "test_company_stocks" table. I am going to cover using Spark in Part II of this article, but after loading I can see the data persisted. I loaded the file to a separate staging table, and then performed a SQL operation in the IDE: "INSERT INTO.....SELECT * FROM staging_table", so I could demonstrate what occurs.


After you run the "INSERT" command in DBeaver, you can immediately go to the HiveServer2 console, and click on the query, then go to "Operation Log" (assuming you have configured it like I had earlier). You should see output like:

Operation Log for SQL Query
Operation Log for SQL Query

If you copy/paste the highlighted URL into a browser tab, you should see the "Hadoop Applications" Window. This is where you can monitor the "MapReduce" job that "translates" the data from one table to another within Hadoop:

Hadoop Application Window
Hadoop Application Window

If you click on the "Tools -> Logs" from the left menu, you can see the progress of the job in the log files. For those really curious, if you set the "tmp" or "scratchdir" to specific locations for Hive, you can see the temporary objects in the OS file system or Hadoop file system, if you chose that option for scratch files.


After completion, I then went back to DBeaver, and ran "select * from test_company_stock", and I could see this:

Result Set from Hive
Result Set from Hive

The "test_company_stocks" folder in Hadoop should now have a subfolder called "data" with partitions and parquet files, since it's using Iceberg. In my case it did, as shown below:


Data Directory with Partitions
Data Directory with Partitions
Parquet Data File within Partition
Parquet Data File within Partition

At this point I was satisfied that my infrastructure, minus Apache Drill, was in good shape to start data processing, and Spark + Hive + Iceberg/Hadoop had been validated. The next post will focus on some data processing scenarios and using Apache Drill with the Hive Catalog.


Below is the script I used to load the data with Spark for those curious; contains some additional steps to demonstrate some points.

from pyspark.sql.types import DoubleType, DateType, BooleanType, IntegerType, StringType, TimestampType
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import hive_metastore_client as hme

spark = (SparkSession
         .builder
         .appName("Python Spark SQL Hive integration example")
		 .master("local[*]")
         .config("spark.jars", "/Volumes/ExtShield/opt/spark-3.5.6-bin-hadoop3/jars/iceberg-spark-runtime-3.5_2.12-1.9.1.jar")
         .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1")
         .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
         .config("spark.sql.catalog.local.type", "hive")
         .config("spark.sql.catalog.local.warehouse", "hdfs://localhost:9000/user/hive/warehouse")
         .config("spark.sql.defaultCatalog", "local")
         .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
		 .config("spark.sql.catalogImplementation", "hive")
         .config("spark.hadoop.hive.metastore.uris", "thrift://localhost:9083")
         .config("spark.sql.warehouse.dir", "hdfs://localhost:9000/user/hive/warehouse")
         .config("spark.sql.warehouse.external.enabled", "true")
         .enableHiveSupport()
         .getOrCreate())

# spark is an existing SparkSession
hive_jdbc = {
	"url": "jdbc:hive2://localhost:10001/default;transportMode=http;httpPath=cliservice",
	"driver": "org.apache.hive.jdbc.HiveDriver",
	"table": "company_stocks"
}

# Read from Hive using JDBC
dfh = spark.read.jdbc(url=hive_jdbc.get("url"), table=hive_jdbc.get("table"))
print(dfh.schema.fieldNames())

df = spark.read.csv('hdfs://localhost:9000/input_files/company-reference.txt',
                                   header=True, inferSchema=True, sep='|')

# Fixing dataframe types
df_new = df.withColumn("AsOfDate", col("AsOfDate").cast(TimestampType())) \
  	.withColumn("ExchangeShortname", col("ExchangeShortName").cast(StringType())) \
  	.withColumn("Employees", col("Employees").cast(IntegerType())) \
  	.withColumn("ADR", col('ADR').cast(BooleanType())) \
  	.withColumn("isFund", col('isFund').cast(BooleanType())) \
  	.withColumn("isETF", col('isETF').cast(BooleanType())) \
  	.withColumn("AverageVolume", col("AverageVolume").cast(DoubleType())) \
  	.withColumn("Beta", col("Beta").cast(DoubleType())) \
  	.withColumn("Discounted_Cash_Flow", col("Discounted_Cash_Flow").cast(DoubleType())) \
  	.withColumn("Discounted_Cash_Flow_Diff", col("Discounted_Cash_Flow_Diff").cast(DoubleType())) \
  	.withColumn("IPO_Date", col("IPO_Date").cast(DateType()))
print(df.printSchema())

# Fixing column order
df_new = df_new.select(col("AsOfDate"), col("CompanyName"), col("Ticker"), col("CIK"), col("ISIN"), col("CUSIP"),
                         col("Description"),col("Exchange"), col("ExchangeShortName"), col("Sector"), col("Country"),
                         col("Currency"), col("City"), col("State"), col("Address"), col("Employees"), col("CEO"),
                         col("Website"), col("ZipCode"), col("ADR"), col("isFund"), col("isETF"), col("AverageVolume"),
                         col("Beta"), col("Discounted_Cash_Flow"), col("Discounted_Cash_Flow_Diff"), col("IPO_Date"),
                         col("Industry"))

df_new.createOrReplaceTempView("vw_raw_company_stocks")
# Save using dataframe API
df_new.write.mode("append").insertInto("default.raw_company_stocks")

# Save using Spark SQL
spark.sql("insert into default.company_stocks "
             "select AsOfDate, CompanyName, StockSymbol, CIK, ISIN, CUSIP, Description, `Exchange`, 	  `ExchangeShortName`, Sector, Country, Currency, City, State, Address, Employees, CEO, Website, ZipCode, ADR, isFund, isETF, AverageVolume, Beta, Discounted_Cash_Flow, Discounted_Cash_Flow_Diff, IPO_Date, Industry from default.raw_company_stocks")

print("Company Stocks")
spark.sql("select * from default.company_stocks").show()

# Connect to Hive using Metastore Client & Thrift
hive_con = hme.HiveMetastoreClient(host="localhost", port=9083)
hive_client = hme.HiveMetastoreClient.open(hive_con)
hme_df = hive_client.get_table(dbname="default", tbl_name="raw_company_stocks")
print(hme_df)


 
 
bottom of page