Apache Hudi on HDInsight Spark

Aitor Murguzur
4 min readFeb 3, 2023

--

When building a data lake or lakehouse on Azure, most people are familiar with Delta LakeDelta Lake on Synapse, Delta Lake on HDInsight and Delta Lake on Azure Databricks, but other open table formats also exist like Apache Hudi and Apache Iceberg.

Apache Hudi can be used with any of the popular query engines like Apache Spark, Flink or Hive, and data can stay in open file formats like parquet. Performance wise, different benchmarks have been released recently, Hudi getting remarkable results (see references below).

In this article, I introduce Hudi quickstart for HDInsight (Spark), following the Hudi on Synapse article from OneHouse. Let’s get started!

> Jump directly to the GitHub repo if you want to stop reading.

Hudi and HDInsight Spark Quickstart

Using Hudi on HDInsight (Spark) is as simple as loading a library into a Jupyter notebook on a HDInsight Spark cluster. If you already have a HDInsight 5.0 Spark cluster, you can skip the prerequisite listed below.

Prerequisite: Create a HDInsight Cluster

If you don’t have one already, create a HDInsight 5.0 Spark cluster (comes with Spark 3.1.3) with ADLS Gen2.

> With Spark 3.1.x in HDInsight, you could use Hudi 0.10+. See different Spark 3 .x version support for Hudi here.

Create a Notebook

Create a Jupyter notebook in HDInsight. If you don’t want to copy/paste the code below, you can download the notebook from GitHub and then import this into your HDInsight cluster.

  • Configure the Hudi lib: Specify the Spark Hudi configuration in valid JSON format in the first cell of the Jupyter notebook.
%%configure -f

{ "conf": {"spark.jars.packages": "org.apache.hudi:hudi-spark3.1-bundle_2.12:0.12.2" }}

Write Sample Data to ADLS Gen2

  • Create sample data: Create a dummy data frame.
new_rows = [("234fr","CA",22, 45000,"2022-05-22"),("edf56","AK",35,65000,"2022-06-22") ,("001uj","WA",50,85000,"2022-07-22")]
demo_df = spark.createDataFrame(new_rows, ['id', 'state', 'age', 'salary','date'])
demo_df.show()
  • Set Hudi write configs: Choose a Hudi base path and set basic write configs. See more on writing data.
basePath = "abfs://default@stmurggu9clngm.dfs.core.windows.net/hudi-test/"
tableName = "hudi_test_data"

hudi_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'date'
}
  • Write to ADLS Gen2 as a Hudi table: Write the sample dataset to ADLS Gen2 as a Hudi table. Go to ADLS Gen2 to see those files.
demo_df.write.format("hudi"). \
options(**hudi_options). \
mode("overwrite"). \
save(basePath)

Create a SQL table

Create a managed or external shared table with Hudi keyword and query the table.

spark.sql("DROP TABLE IF EXISTS HudiDemoTable")
spark.sql("CREATE TABLE HudiDemoTable USING HUDI LOCATION '{0}'".format(basePath))
%%sql
select * from HudiDemoTable

Upserts/Merges

Query a single record and check the original state value. Then, change the write operation to upsert, the state to WA, and write the updated value as an append. Now you can see that the original value is updated.

origvalue = spark. \
read. \
format("hudi"). \
load(basePath). \
where("_hoodie_record_key = 'edf56'")

origvalue.select("state").show()
from pyspark.sql.functions import lit

hudi_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'date'
}

updatevalue = origvalue.withColumn("state", lit("WA"))

updatevalue.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(basePath)
testupdate = spark. \
read. \
format("hudi"). \
load(basePath). \
where("_hoodie_record_key = 'edf56'")

testupdate.select("state").show()

Time travel

Query the point in time dataset with a commit instant, or a timestamp. You can find a commit instant by going to ADLS Gen2 and finding a {}.commit within the .hoddie folder.

testupdate = spark. \
read. \
format("hudi"). \
option("as.of.instant", "20230203132655256"). \
load(basePath). \
where("_hoodie_record_key = 'edf56'")

testupdate.select("state").show()

Incremental queries

You can ask Hudi for all the records that are new or updated after a given commit or timestamp. Using the same commit instant below.

incremental_read_options = {
'hoodie.datasource.query.type': 'incremental',
'hoodie.datasource.read.begin.instanttime': 20230203132655256,
}

demo_df_incremental = spark.read.format("hudi"). \
options(**incremental_read_options). \
load(basePath)

demo_df_incremental.show()

Deletes

You can delete records in Hudi. Query the record you want to delete, set the hoodie write operation to delete and write those records as an append. Finally, you can confirm that the record was deleted.

todelete = spark. \
read. \
format("hudi"). \
load(basePath). \
where("_hoodie_record_key = '234fr'")

todelete.show()
hudi_delete_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.operation': 'delete',
'hoodie.datasource.write.precombine.field': 'date'
}

todelete.write.format("hudi"). \
options(**hudi_delete_options). \
mode("append"). \
save(basePath)
todelete = spark. \
read. \
format("hudi"). \
load(basePath). \
where("_hoodie_record_key = '234fr'")

todelete.show()

For any suggestions or questions, feel free to reach out :)

--

--

Aitor Murguzur
Aitor Murguzur

Written by Aitor Murguzur

All things data. Principal PM @Microsoft. PhD in Comp Sci. All views are my own. https://www.linkedin.com/in/murggu/

No responses yet