Apache Hudi on HDInsight Spark
When building a data lake or lakehouse on Azure, most people are familiar with Delta Lake — Delta Lake on Synapse, Delta Lake on HDInsight and Delta Lake on Azure Databricks, but other open table formats also exist like Apache Hudi and Apache Iceberg.
Apache Hudi can be used with any of the popular query engines like Apache Spark, Flink or Hive, and data can stay in open file formats like parquet. Performance wise, different benchmarks have been released recently, Hudi getting remarkable results (see references below).
In this article, I introduce Hudi quickstart for HDInsight (Spark), following the Hudi on Synapse article from OneHouse. Let’s get started!
> Jump directly to the GitHub repo if you want to stop reading.
Hudi and HDInsight Spark Quickstart
Using Hudi on HDInsight (Spark) is as simple as loading a library into a Jupyter notebook on a HDInsight Spark cluster. If you already have a HDInsight 5.0 Spark cluster, you can skip the prerequisite listed below.
Prerequisite: Create a HDInsight Cluster
If you don’t have one already, create a HDInsight 5.0 Spark cluster (comes with Spark 3.1.3) with ADLS Gen2.
> With Spark 3.1.x in HDInsight, you could use Hudi 0.10+. See different Spark 3 .x version support for Hudi here.
Create a Notebook
Create a Jupyter notebook in HDInsight. If you don’t want to copy/paste the code below, you can download the notebook from GitHub and then import this into your HDInsight cluster.
- Configure the Hudi lib: Specify the Spark Hudi configuration in valid JSON format in the first cell of the Jupyter notebook.
%%configure -f
{ "conf": {"spark.jars.packages": "org.apache.hudi:hudi-spark3.1-bundle_2.12:0.12.2" }}
Write Sample Data to ADLS Gen2
- Create sample data: Create a dummy data frame.
new_rows = [("234fr","CA",22, 45000,"2022-05-22"),("edf56","AK",35,65000,"2022-06-22") ,("001uj","WA",50,85000,"2022-07-22")]
demo_df = spark.createDataFrame(new_rows, ['id', 'state', 'age', 'salary','date'])
demo_df.show()
- Set Hudi write configs: Choose a Hudi base path and set basic write configs. See more on writing data.
basePath = "abfs://default@stmurggu9clngm.dfs.core.windows.net/hudi-test/"
tableName = "hudi_test_data"
hudi_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'date'
}
- Write to ADLS Gen2 as a Hudi table: Write the sample dataset to ADLS Gen2 as a Hudi table. Go to ADLS Gen2 to see those files.
demo_df.write.format("hudi"). \
options(**hudi_options). \
mode("overwrite"). \
save(basePath)
Create a SQL table
Create a managed or external shared table with Hudi keyword and query the table.
spark.sql("DROP TABLE IF EXISTS HudiDemoTable")
spark.sql("CREATE TABLE HudiDemoTable USING HUDI LOCATION '{0}'".format(basePath))
%%sql
select * from HudiDemoTable
Upserts/Merges
Query a single record and check the original state
value. Then, change the write operation to upsert
, the state
to WA
, and write the updated value as an append
. Now you can see that the original value is updated.
origvalue = spark. \
read. \
format("hudi"). \
load(basePath). \
where("_hoodie_record_key = 'edf56'")
origvalue.select("state").show()
from pyspark.sql.functions import lit
hudi_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'date'
}
updatevalue = origvalue.withColumn("state", lit("WA"))
updatevalue.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(basePath)
testupdate = spark. \
read. \
format("hudi"). \
load(basePath). \
where("_hoodie_record_key = 'edf56'")
testupdate.select("state").show()
Time travel
Query the point in time dataset with a commit instant, or a timestamp. You can find a commit instant by going to ADLS Gen2 and finding a {}.commit
within the .hoddie
folder.
testupdate = spark. \
read. \
format("hudi"). \
option("as.of.instant", "20230203132655256"). \
load(basePath). \
where("_hoodie_record_key = 'edf56'")
testupdate.select("state").show()
Incremental queries
You can ask Hudi for all the records that are new or updated after a given commit or timestamp. Using the same commit instant below.
incremental_read_options = {
'hoodie.datasource.query.type': 'incremental',
'hoodie.datasource.read.begin.instanttime': 20230203132655256,
}
demo_df_incremental = spark.read.format("hudi"). \
options(**incremental_read_options). \
load(basePath)
demo_df_incremental.show()
Deletes
You can delete records in Hudi. Query the record you want to delete, set the hoodie write operation to delete
and write those records as an append
. Finally, you can confirm that the record was deleted.
todelete = spark. \
read. \
format("hudi"). \
load(basePath). \
where("_hoodie_record_key = '234fr'")
todelete.show()
hudi_delete_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.operation': 'delete',
'hoodie.datasource.write.precombine.field': 'date'
}
todelete.write.format("hudi"). \
options(**hudi_delete_options). \
mode("append"). \
save(basePath)
todelete = spark. \
read. \
format("hudi"). \
load(basePath). \
where("_hoodie_record_key = '234fr'")
todelete.show()
For any suggestions or questions, feel free to reach out :)
References:
Hudi on Azure —
Delta Lake on Azure —
- Delta Lake on HDInsight
- Delta Lake on Synapse
- Azure Synapse and Delta Lake by James Serra
- Delta Lake on Azure Databricks
Delta Lake vs. Hudi vs. Iceberg —
- [May 2013] Lakehouse at Fortune 1 Scale. Walmart systems produce one of the… | by Samuel Guleff | Walmart Global Tech Blog | May, 2023 | Medium
- [Jan 2023 updated, Aug 2022] Apache Hudi vs Delta Lake vs Apache Iceberg — Lakehouse Feature Comparison by OneHouse
- [Jan 2023] Analyzing and Comparing Lakehouse Storage Systems by UC Berkeley, Stanford University and Databricks
- [Nov 2022] Setting the Table: Benchmarking Open Table Formats by Brooklyn Data Co.
- [Aug 2022] Data Lake / Lakehouse Guide: Powered by Data Lake Table Formats (Delta Lake, Iceberg, Hudi) by Airbyte
- [Jul 2022] Delta vs Hudi: Databeans’ vision on Benchmarking by DataBeans
- [Jun 2022] Apache Hudi vs Delta Lake — Transparent TPC-DS Lakehouse Performance Benchmarks by OneHouse
- [Jun 2022] Delta vs Iceberg vs Hudi: Reassessing Performance by DataBeans