The ACID table storage layer- thorough conceptual comparisons between Delta Lake and Apache Hudi (part 1)
While I was doing my data engineer internship at Cathay Financial Holdings, I spent most of my time researching the difference between Delta Lake and Apache Hudi. My mentor and I found this emerging technology could help the bank detect fraud in real time in an ATM withdrawal scenario if it went to production. It impressed me that we could save the institution millions of dollars, considering the significant amount of fraud that happens each year.
2020 年在國泰數數發中心(DDT)做數據工程的實習生，大部分的時間都在研發，還有活動，痾，更多的活動…，這篇聚焦在 lab 工作時研究的題目 ACID table storage layer 和不同架構實作的方式，訊息量有點大，預計分兩個 stories 來寫，但大部分都不是中文，因為，懶得之後再翻成英文 XDD 哈哈哈，好啦，廢話不多說，直接進入正題吧!
OK, let’s jump right into it.
According to Amazon Web Service, a data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale. It is highly scalable, elastic, schema-on-read, and usually appeared as HDFS, Azure Data Lake Storage, AWS S3, etc. It is used to solve the problems presented by data warehouse, however, as the data grows faster, bigger, and more complicated, it becomes a mess for many of the Big Data developers.
So, what are the problems here? Mainly, there are four challenges, if not dealt with properly, that can turn your data lake into a data swamp.
- No ACID support
Data Lakes usually have multiple concurrent read and write operations, and the data is not guaranteed to be correct and consistent for everyone.
- Missing schema management
Your data and its metadata is continuously changing. It is hard for your data engineering team to keep track of the table schema in your company. It becomes more and more tedious as time goes by.
- Small files problem
Traditionally, streaming data into a Data Lake can generate an enormous amount of small files. These small-sized files will significantly lower your query performance using Apache Spark or Hadoop MapReduce.
- Inefficient data skipping
If the data is too big, it is often considered common practice to partition your table. However, the partitioning is only effective when the table has chronological or low cardinality columns. Moreover, the partitioned data is sometimes too big as well.
In order to tackle the above problems, Delta Lake and Apache Hudi, as well as other frameworks, propose their solutions, implemented with different philosophies at their core. In this article, I will introduce Delta Lake and Apache Hudi in particular, due to fact that they are the most popular among all in this category.
Features and Implementation Details
Let’s first talk about Delta Lake. From its official website, they define it as an open-source storage layer that brings ACID
transactions to Apache Spark™ and big data workloads. A table in Delta Lake is a batch table as well as a streaming source and sink. Streaming data ingest, batch historic backfill, interactive queries all just work out of the box.
From Hudi official website, they define it as bringing stream processing to big data, providing fresh data while being an order of magnitude efficient over traditional batch processing. It is the so-called incremental processing, which is sometimes called macro batch streaming. Basically, incremental processing means that dealing with a little bit of data at a time. They implement it as DeltaStreamer other than using existed streaming frameworks like Apache Flink, Spark Structured Streaming, etc.
With this basic understanding in mind, we could move forward to the features and implementation details.
Transaction model — ACID support
Both Delta Lake and Apache Hudi provide ACID properties to tables, which means it would record every action you make to them, and generate metadata along with the data itself. Take Delta Lake implementation for example.
Table = result of a set of actions. It is called transaction log in Delta Lake.
Below are the possible actions:
- Update Metadata
- Add File
- Remove File
- Set Transaction
- Change Protocol
- Commit Info
Result: Current Metadata, List of Files, List of Txns, Version. It ends up like this.
If you look at JSON files in _delta_log, you should note that changes to the table are stored as ordered atomic units called commits. They are either created or not and are the way to implement atomicity.
Think of it like Git commit, you can go back to any time your data is persisted.
When it comes to isolation level, both Delta Lake and Apache Hudi default to snapshot isolation, but with slightly different approaches. Delta Lake supports the strongest serializability isolation, the modest write serializability isolation, and the weakest snapshot isolation. Apache Hudi only supports snapshot isolation. In simple words, snapshot isolation allows multiple users to see the same snapshot view of the table, therefore, they have the same starting point to do a transaction.
In addition, Delta Lake and Apache Hudi utilize a similar OCC(Optimistic Concurrency Control) strategy. OCC assumes that multiple transactions can frequently complete without interfering with each other. There is no lock employed here, thus, OCC has higher throughput.
Specifically, here’s the typical workflow implemented in Delta Lake.
1. Record start version
2. Record reads/writes
3. Attempt commit
4. If someone else wins, check if anything you read has changed.
5. Try again
Following the above procedure, OCC guarantees that multiple writers can simultaneously modify a table and see a consistent snapshot view of the table and there will be a serial order for these writes.
Unlike Apache Spark’s and Apache Hive’s data persistence strategy, data stored in HDFS or any other cloud storage services contains additional metadata to make the below table types work.
There are Copy-on-write(CoW) and Merge-on-read(MoR) tables. You can look at the below animations.
You can see that commits are fully merged into the table during a write operation. This “compaction” happens right away. No log files are written and file slices contain only the base file. (e.g a single parquet file constitutes one file slice)
Also, there are snapshot query and incremental query at the bottom of the gif. Snapshot query is performed on the latest files in a given table, whereas incremental query is performed on a given range of time window.
1. Incremental query in a CoW table is analogous to Delta Lake’s time traveling.
2. The CoW table is consisted of Apache Parquet files.
3. Delta Lake does not support MoR table, which makes it less efficient in a streaming scenario.
For the MoR table, records are quickly first written to log files(Apache Avro), which are at a later time merged with the base file (Apache Parquet), using a compaction action on the timeline. Various query types can be supported depending on whether the query reads the merged snapshot or the change stream in the logs or the un-merged base-file alone.
Different from the CoW table, we have one more option to query a table, the read optimized query. It only accesses the base file, providing data as of the last compaction action performed on a given file slice. In addition, incremental query on MoR table is performed by merging the latest base file and its log files across all file slices in a given table or table-partition. In my opinion, if data freshness is not your priority, read optimized query often takes much lesser time to complete.
1. MoR compaction can be triggered manually or automatically depending on compaction policy. CoW compaction happens right after the write operation.
2. MoR table is consisted of Apache Parquet files for optimized query and Apache Avro files for fast ingesting data.
In case you are not familiar with Apache Parquet and Apache Avro, according to this article, basically, Apache Parquet is column-oriented, especially good for queries that read particular columns from a “wide” (with many columns) table since only needed columns are read and IO is minimized. Apache Avro, on the other hand, is row-oriented, efficient for write operation in terms of streaming. The data itself is stored in a binary format making it compact and efficient.
I just explained the concepts in my own words, there is more depth regarding the above table types. For more information, you can check out Hudi’s design and architecture documentation.
But wait, what’s the point of implementing these two tables? In this Uber’s blog post, you should get the gist of it. It says:
Before Hudi was implemented at Uber, large Apache Spark jobs periodically rewrote entire datasets to Apache HDFS to absorb upstream online table inserts, updates, and deletes, reflecting changes in the trip status.
Hudi’s copy-on-write feature enables us to perform file level updates, improving data freshness drastically.
Even using Hudi’s copy-on-write functionality, some of our tables received updates that were spread across 90 percent of the files, resulting in data rewrites of around 100 TB for any given large-scale table in the data lake. Since copy-on-rewrite rewrites the entire file for even a single modified record, the copy-on-write functionality led to high write amplifications and compromised freshness, causing unnecessary I/O on our HDFS clusters and degrading disks much faster.
The Apache Hudi team at Uber developed a data compaction strategy for merge-on-read tables to convert recent partitions in a columnar format frequently, thereby limiting query side compute cost.
Wrapping it up
So far, we talked about why we need ACID table storage layers, how the transaction log works, and what exactly are CoW and MoR tables. In part 2 of the series, I will dive into the compaction, data skipping, streaming support, and schema management.
Hope you enjoy reading the article. Thanks.