Getting started with Apache Hudi

DataCouch
15 min readDec 19, 2023

--

Introduction

In the world of data processing, batch data processing is one of the oldest techniques still widely in use. Over time, teams realized that batch data processing alone is not enough and there is a requirement of reimagining data processing to be able to deliver concrete solutions for contemporary problems. This was done by adding “incremental” data processing capabilities alongside existing batch which allows processing ingested data in near real-time or streaming data processing.

The engineering department at Uber recognized this need and developed an internal data management framework named as Hudi (short for Hadoop Upsert Delete and Incremental and pronounced hoodie). Uber later donated to the Apache Foundation as an open-source product. Hudi offered a scan-optimized yet analytical abstraction layer allowing applying incremental mutations to data stored in HDFS, S3, Google Cloud Storage, Azure Blob, and many more. These changes are not speed-optimized and can take a few minutes with chaining of incremental processing through the ETL pipeline.

Let’s explore this fully incremental data ingestion framework and learn how this moderately complex ETL tool can be used with data lakes.

Learning Objectives

  1. What is Apache Hudi?
  2. Evolution of Apache Hudi
  3. Why Apache Hudi?
  4. Apache Hudi Architecture
  5. Table and Query Types
  6. Getting started with Hudi on Spark

What is Apache Hudi?

Apache Hudi is a game-changing technology for spinning up data lakehouses to meet the data needs of the future. It revolutionizes how we interact with and process data within the data lakehouse environment. The initial goal, at the time of its inception, was to make sure that batch-oriented data pipelines can become incremental, fast, and efficient.

So, Hudi took the streaming stack and put it on the top of the batch stack to offer an added layer of abstraction. It converged two independent stacks into a layered architecture that promised the best of both worlds to developers. And Hudi delivered on this promise hands-down!

In the noughties and early 2010s, most conventional databases exposed records as tables (i.e., group or block of records) and were great at processing batches of data. To some extent, they were also great in capturing incoming streaming data and adding it to the tables (at a great cost to efficiency we must add), but not the vice versa. Hudi allowed developers to reimagine storage layers where one can expose tables as streams with support for updates, deletes, and transaction processing.

This allowed developers to consume all data as streams, even when the underlying data were coming from a table, and to be able to join them with other streams and snapshots. You didn’t even have to move the data out of the data lakehouse.

Evolution of Apache Hudi

The development of Hudi as a data streaming solution began at Uber in early 2010s but it took concrete shape around mid 2010s. Uber was processing streaming data at petabyte scale where the changes captured in an RDBMS were to be transmitted to a data lake via CDC (change data capture) or binlogs. This required transforming the batch-oriented data in tables into streams for managing key business functions like trips, driver payouts, and responding to customers.

By 2016, Hudi first went into production at Uber and by 2017 was made open-source to encourage community development. Since becoming a pure Apache project in 2019, Hudi has really captured the imagination of many data-rich organizations including Disney, ServiceNow, Tathastu.ai, Tesla, Walmart, and many more.

Why Apache Hudi?

Hudi’s genesis stemmed from addressing Uber’s evolving data platform needs, including incremental data processing, data versioning, and efficient data ingestion.

Hudi’s core capability connects relational databases to data lake storage layers like S3 and HDFS via CDC or binlog, making it possible to capture data changes efficiently. This was crucial at a time when alternatives for updating data on S3 were scarce, and Hudi played a pivotal role in shaping modern data lakes. Today, it seamlessly integrates with popular query engines such as Trino, PrestoDB, and Spark.

There are many challenges that Hudi tackles. These problems may range from data scalability issues in HDFS to accelerated data delivery in Hadoop and the support for SQL for reading/writing data across multiple frameworks like Spark, Presto, Trino, and Hive.

With Hudi, data engineering teams are able to make incremental queries and change streams at the record level. It also offers support for streaming ingestion and built-in CDC sources and tools. Hudi also features built-in metadata tracking for efficient storage access and excels in transactions, rollbacks, concurrency control, upserts, and deletions with fast, pluggable indexing.

  1. Hudi solves the scalability limit in HDFS
  2. It lets you develop faster data delivery in Hadoop
  3. Hudi can make SQL Read/Writes from Spark, Presto, Trino, Hive, & many more frameworks
  4. It lets you perform incremental queries & record-level change streams
  5. Streaming Ingestion, Built-in CDC sources & tools are available
  6. Built-in metadata tracking is available for scalable storage access
  7. Transactions, Rollbacks, Concurrency Control, Upserts, Delete with fast, pluggable indexing are some of the more shining features of Hudi

Hudi Use Cases

  1. Near Real-time Data Ingestion: While not strictly real-time, Hudi offers resource-efficient alternatives like Upsert for RDBMS data ingestion. The DeltaStreamer tooling facilitates scaling to include more sources.
  2. Incremental Processing: Hudi addresses the challenge of handling late-arriving data by providing dedicated tables for new data and reconciling granular changes for individual reads, reducing processing schedules.
  3. New Batch and Streaming Architecture: Hudi introduces data streaming principles, enabling faster data ingestion than traditional architectures and supporting incremental processing pipelines. Its resource-friendly approach enhances analytics performance with reduced operational costs.
  4. Data Warehousing: Apache Hudi provides a strong foundation for scalable data warehousing. It supports ACID transactions and incremental processing, ensuring up-to-date data warehouses.
  5. Schema Evolution: Hudi leverages Avro for dynamic schema alterations, ensuring data pipelines remain consistent and robust in the face of schema changes without compromising backward compatibility.
  6. Data Quality Management: With ACID compliance and schema evolution, Apache Hudi is ideal for maintaining data quality across diverse sources.

Apache Hudi Architecture

Apache Hudi’s architecture is designed to provide a comprehensive solution for managing data in a modern data lake environment. It consists of several key components that seamlessly work together to enable efficient data operations and analytics.

  1. Data Sources: Apache Hudi can ingest data from various sources, including data streams, databases, and cloud storage. This versatility ensures compatibility with a wide range of data inputs.
  2. Hudi Core: The heart of Apache Hudi comprises several essential components:
  • ACID Guarantees: Just like databases, Hudi offers ACID (Atomicity, Consistency, Isolation, Durability) guarantees, ensuring data integrity and reliability.
  • Incremental Pipelines: Hudi introduces efficient incremental data processing, enabling low-latency analytics.
  • Multimodal Indexes: Multimodal indexing supports various optimized query types and patterns.
  • Managed Tables: Hudi manages data in tables, providing structure and organization within the lake.

3. Lakehouse Platform: Apache Hudi seamlessly eases into a “lakehouse,” combining data lake and data warehouse capabilities offering the best of both worlds with efficient data storage and analytics.

4. Metadata: Metadata management is crucial for tracking and organizing data. Hudi has an efficient metadata management system to keep track of changes to schemas and table structures.

5. Data Sinks: Data processed through Apache Hudi can be directed to various data sinks, including BI analytics tools, interactive analytics platforms, batch processing systems, and stream analytics solutions. This flexibility ensures that data is available for a wide range of analytics and reporting purposes.

6. Simplified Orchestration: Orchestrating data pipelines gets simplified with Apache Hudi, as it integrates well with orchestration tools to automate data workflows.

Hudi Storage Layout

Apache Hudi employs a well-organized storage layout using Directory Structure, for efficiently managing data within Hudi tables.

Directory structures provide several advantages in storage management. It enhances data organization, making it easier to locate and manage files and folders efficiently. A well-designed directory structure allows easy and efficient data categorization, security, and systematic role-based access. It also enables efficient incremental data backup and recovery processes. Moreover, it simplifies collaboration, as teams can easily navigate and collaborate on shared data. Overall, directory structures contribute to improved data management, reduced storage costs, and enhanced data security.

Understanding Hudi’s storage layout is essential for efficient data management and retrieval within Hudi tables.

Let’s take a closer look at how Hudi structures its storage:

  1. Base Path: Hudi tables are organized within a base path, which serves as the root directory for the entire Hudi table.
  2. Partitioning: Data within Hudi tables can be organized either with a flat, non-partitioned structure or based on coarse-grained partitioning values defined for the table. In non-partitioned tables, all data files are stored directly under the base path.
  3. Partition Columns: Unlike traditional Hive-style partitioning, Hudi retains partition columns within data files. Partitioning in Hudi primarily serves as an organization mechanism for data files.
  4. .hoodie Directory: A special reserved directory named “.hoodie” resides within the base path. This directory is used for storing transaction logs and critical metadata related for Hudi tables.
  5. hoodie.properties file: Hudi uses a file named “hoodie.properties” to store table-level configurations. These configurations are shared by the writers and the readers of Hudi tables.

Tables and Query Types

Hudi table types dictate the indexing and layout of data within the distributed file system, shaping the implementation of core primitives and timeline activities that interact with this structure, particularly influencing data writing processes. Additionally, query types define the manner in which the underlying data becomes accessible to queries.

Table Types

Hudi supports Copy on Write and Merge on Read Table Types.

Copy on Write (CoW)

Hudi stores data in a directory structure and each directory contains a set of files that represent a snapshot of the data at a particular point in time. When we add new data to a Hudi table, Hudi creates a new directory and copies the files from the old directory to the new directory — called a “file slice

Hudi also keeps track of the changes that have been made to each file slice and this information is stored in a metadata file. When you query a Hudi table, Hudi only reads the files that have changed since the last time you queried the table. This makes queries much faster than if Hudi had to read the entire table every time you queried it. It exclusively stores data using columnar file formats (e.g. Parquet). The CoW table type is recommended for analytical workloads, which are predominantly read-heavy

Merge on Read (MoR)

The MoR table type allows quick data ingestion and querying in near real time. For them, Hudi stores data in a combination of columnar (Parquet) and row-based (Avro) formats.

Updates are logged to row-based delta files which are compacted and merged with columnar base files as needed to create new versions of the base files. It attempts to balance read and write amplification intelligently, to provide near real-time data. During compaction, it carefully chooses which delta log files need to be merged with their columnar base file, to maintain high query performance.

The MoR table format is recommended for real-time workloads, as it required balance between read & write both

Differences between CoW and MoR Tables

Let’s see the trade-off between the two:

In CoW datasets, whenever a record is updated, Apache Hudi rewrites the entire file containing that record with the updated values. This approach ensures that the data remains immutable, preserving a historical snapshot of changes. CoW tables are ideal for scenarios where data changes infrequently, and the workload is primarily read-heavy. For example, in a financial institution, a CoW table could be used to store historical transaction data. Even when updates occur, the original data remains intact, providing a reliable audit trail for compliance purposes.

On the other hand, in MoR datasets, Apache Hudi takes a different approach. When a record is updated, it only writes the row for the changed record, minimizing data duplication. MoR tables are well-suited for workloads characterized by frequent writes or changes and fewer read operations. For instance, in an e-commerce platform, a MoR table can efficiently handle the constant updates to product inventory. Instead of rewriting entire files, Hudi appends the changes, optimizing write performance.

In summary, CoW tables in Apache Hudi prioritize data immutability and are best for read-heavy scenarios with infrequent data changes, while MoR tables focus on efficient updates and are suitable for write-heavy workloads with frequent data modifications. The choice between them depends on the specific use case and workload requirements.

Query types

Apache Hudi offers a variety of query types to cater to different data access and analysis requirements. Let’s explore the key query types in Hudi:

  1. Snapshot Queries: Snapshot Queries retrieve the current state of data as of a specific commit or timestamp. They provide a consistent view of the data at the chosen point in time. Snapshot queries are useful for regular analytics and reporting.
  2. Incremental Queries: Incremental Queries fetch only the changes made to the data since the last commit. They enable low-latency analytics by reading and processing only the new or modified data. Incremental queries are ideal for scenarios requiring real-time or near-real-time data access.
  3. Read-Optimized Queries: Read-Optimized Queries take advantage of the optimized storage layout in Hudi. They read and leverage columnar storage formats like Parquet to improve query performance. Read-optimized queries are suitable for analytical workloads requiring efficient data retrieval. It is only available with MOR Tables.

These three are the primary type of queries that we can execute within Hudi. Other than that we have one more type of query that is Time Travel Query.

Time Travel Queries allow users to query data at any historical point in time using the Timeline. Users can travel back in time to analyze the data as it existed at past commits. Time travel queries are valuable for auditing, compliance, and historical analysis.

The availability of multiple query types in Apache Hudi ensures that users can choose the most appropriate method for their specific use cases and data access needs

The query types are dependent on table types, as is shown below:

How Different Queries Work with CoW Table Format?

How Different Queries Work with MoR Table Format?

Getting started with Hudi on Spark

Installing Apache Hudi on Spark

Let’s see how we can start using Hudi with Spark shell. Hudi works with Spark-2.4.3+ and Spark 3.x versions.

Run the following command for downloading proper packages and starting Hudi with your Spark installation:

# starting PySpark 3 with Hudi

$ pyspark — packages org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:3.0.1 — conf ‘spark.serializer=org.apache.spark.serializer.KryoSerializer’

Now, run the following command to instantiate a Scala Spark shell with Hudi:

# starting Scala Spark Shell 3 with Hudi

$ spark-shell — packages org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:3.0.1 — conf ‘spark.serializer=org.apache.spark.serializer.KryoSerializer’

Note: In all further labs, we will only code in PySpark.

Setting up the Table

Let’s see how we can create a table using Hudi within PySpark.

There is no separate create table command required in Spark. The very first batch of write to a table will create the table, if it does not already exist.

So, let’s set up the table name, base path and a data generator to generate records to be used in later commands.

# PySpark

tableName = “hudi_trips_cow”

basePath = “/tmp/hudi_trips_cow”

dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()

Adding and Querying Data

Now, we would generate new sample records using the datagen tool for the trips table, load them into a DataFrame and write this DataFrame to a Hudi table. Then, we would be reading the data back in a DataFrame. Finally, we would be defining some Hudi options to pass them while writing data to the Hudi table.

# PySpark

inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))

df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))

hudi_options = { ‘hoodie.table.name’: tableName,’hoodie.datasource.write.recordkey.field’: ‘uuid’, ‘hoodie.datasource.write.partitionpath.field’: ‘partitionpath’, ‘hoodie.datasource.write.table.name’: tableName, ‘hoodie.datasource.write.operation’: ‘upsert’, ‘hoodie.datasource.write.precombine.field’: ‘ts’, ‘hoodie.upsert.shuffle.parallelism’: 2, ‘hoodie.insert.shuffle.parallelism’: 2}

df.write.format(“hudi”).options(**hudi_options).mode(“overwrite”).save(basePath)

Now, load the data file into the data frame and apply some queries to it.

# PySpark

tripsSnapshotDF = spark.read.format(“hudi”).load(basePath)

# Creating the temp view

tripsSnapshotDF.createOrReplaceTempView(“hudi_trips_snapshot”)

# Applying some sql queries on tempview

spark.sql(“select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0”).show()

# PySpark

# Applying some sql queries on tempview

spark.sql(“select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot”).show()

Updating the Data

Now, we would explore how we can update data in Apache Hudi. This is similar to inserting new data. Generate updates to existing trips using the data generator tool, load into a DataFrame, and finally write the DataFrame to the Hudi table.

# PySpark

# Applying some sql queries on tempview

updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))

df = spark.read.json(spark.sparkContext.parallelize(updates, 2))

df.write.format(“hudi”).options(**hudi_options).mode(“append”).save(basePath)

Now, create a temp view and query it.

# PySpark

tripsSnapshotDF = spark.read.format(“hudi”).load(basePath)

# Creating the temp view

tripsSnapshotDF.createOrReplaceTempView(“hudi_trips_snapshot”)

# Applying some sql queries on tempview

spark.sql(“select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0”).show()

# PySpark

# Applying some sql queries on tempview

spark.sql(“select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot”).show()

Comparing Results

Hudi provides the capability to obtain a stream of records that changed since the given commit timestamp. This can be achieved using Hudi’s incremental querying and providing a beginning time from which changes need to be streamed. We do not need to specify the endTime if we want all changes after the given commit.

First of all, let’s reload the data.

# PySpark

spark.read.format(“hudi”).load(basePath).createOrReplaceTempView(“hudi_trips_snapshot”)

Now let’s apply commit

# PySpark

commits = list(map(lambda row: row[0], spark.sql(“select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime”).limit(50).collect()))

beginTime = commits[len(commits) — 2] # commit time we are interested in

Applying Incremental Queries

Let’s apply incremental queries to data using Hudi

# PySpark

incremental_read_options = { ‘hoodie.datasource.query.type’: ‘incremental’, ‘hoodie.datasource.read.begin.instanttime’: beginTime}

tripsIncrementalDF = spark.read.format(“hudi”).options(**incremental_read_options).load(basePath)

# creating temp view

tripsIncrementalDF.createOrReplaceTempView(“hudi_trips_incremental”)

# PySpark

spark.sql(“select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0”).show()

Deleting Data

Now, explore how we can delete the data

# PySpark

# fetch total records count

spark.sql(“select uuid, partitionpath from hudi_trips_snapshot”).count()

# fetch two records to be deleted

ds = spark.sql(“select uuid, partitionpath from hudi_trips_snapshot”).limit(2)

Defining options for deleting the data

# PySpark

# issue deletes

hudi_delete_options = { ‘hoodie.table.name’: tableName, ‘hoodie.datasource.write.recordkey.field’: ‘uuid’, ‘hoodie.datasource.write.partitionpath.field’: ‘partitionpath’, ‘hoodie.datasource.write.table.name’: tableName, ‘hoodie.datasource.write.operation’: ‘delete’, ‘hoodie.datasource.write.precombine.field’: ‘ts’, ‘hoodie.upsert.shuffle.parallelism’: 2, ‘hoodie.insert.shuffle.parallelism’: 2}

from pyspark.sql.functions import lit

deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))

df = spark.sparkContext.parallelize(deletes).toDF([‘uuid’, ‘partitionpath’]).withColumn(‘ts’, lit(0.0))

df.write.format(“hudi”).options(**hudi_delete_options).mode(“append”).save(basePath)

Now let’s run the same query as above to see the data from dataframe. Here we would confirm whether the data is deleted or not.

# PySpark

# run the same read query as above.

roAfterDeleteViewDF = spark.read.format(“hudi”).load(basePath)

roAfterDeleteViewDF.registerTempTable(“hudi_trips_snapshot”)

# fetch should return (total — 2) records

spark.sql(“select uuid, partitionpath from hudi_trips_snapshot”).count()

Earlier when we ran the same query we got 10 as result but this time we got 8.

Conclusion

Apache Hudi is a great framework for managing the storage of large analytical datasets for HDFS and has the prime purpose of decreasing data latency while ingesting data with high efficiency. Its large repository of features like ACID transaction support, concurrency control, upserts, delete, and fast & pluggable Indexing makes Hudi much more efficient.

--

--

DataCouch
DataCouch

Written by DataCouch

We are a team of Data Scientists who provide training and consultancy services to professionals worldwide. Linkedin- https://in.linkedin.com/company/datacouch