This is part of a three-post series on Kudu, a new data storage system from Cloudera.
- Part 1 is an overview of Kudu technology.
- Part 2 (this post) is a lengthy dive into how Kudu writes and reads data.
- Part 3 is a brief speculation as to Kudu’s eventual market significance.
Let’s talk in more detail about how Kudu stores data.
- As previously noted, inserts land in an in-memory row store, which is periodically flushed to the column store on disk. Queries are federated between these two stores. Vertica taught us to call these the WOS (Write-Optimized Store) and ROS (Read-Optimized Store) respectively, and I’ll use that terminology here.
- Part of the ROS is actually another in-memory store, aka the DeltaMemStore, where updates and deletes land before being applied to the DiskRowSets. These stores are managed separately for each DiskRowSet. DeltaMemStores are checked at query time to confirm whether what’s in the persistent store is actually up to date.
- A major design goal for Kudu is that compaction should never block – nor greatly slow — other work. In support of that:
- Compaction is done, server-by-server, via a low-priority but otherwise always-on background process.
- There is a configurable maximum to how big a compaction process can be — more precisely, the limit is to how much data the process can work on at once. The current default figure = 128 MB, which is 4X the size of a DiskRowSet.
- When done, Kudu runs a little optimization to figure out which 128 MB to compact next.
- Every tablet has its own write-ahead log.
- This creates a practical limitation on the number of tablets …
- … because each tablet is causing its own stream of writes to “disk” …
- … but it’s only a limitation if your “disk” really is all spinning disk …
- … because multiple simultaneous streams work great with solid-state memory.
- Log retention is configurable, typically the greater of 5 minutes or 128 MB.
- Metadata is cached in RAM. Therefore:
- ALTER TABLE kinds of operations that can be done by metadata changes only — i.e. adding/dropping/renaming columns — can be instantaneous.
- To keep from being screwed up by this, the WOS maintains a column that labels rows by which schema version they were created under. I immediately called this MSCC — Multi-Schema Concurrency Control — and Todd Lipcon agreed.
- Durability, as usual, boils down to “Wait until a quorum has done the writes”, with a configurable option as to what constitutes a “write”.
- Servers write to their respective write-ahead logs, then acknowledge having done so.
- If it isn’t too much of a potential bottleneck — e.g. if persistence is on flash — the acknowledgements may wait until the log has been fsynced to persistent storage.
- There’s a “thick” client library which, among other things, knows enough about the partitioning scheme to go straight to the correct node(s) on a cluster.
Leaving aside the ever-popular possibilities of:
- Cluster-wide (or larger) equipment outages
the main failure scenario for Kudu is:
- The leader version of a tablet (within its replica) set goes down.
- A new leader is elected.
- The workload is such that the client didn’t notice and adapt to the error on its own.
Todd says that Kudu’s MTTR (Mean Time To Recovery) for write availability tests internally at 1-2 seconds in such cases, and shouldn’t really depend upon cluster size.
Beyond that, I had some difficulties understanding details of the Kudu write path(s). An email exchange ensued, and Todd kindly permitted me to post some of his own words (edited by me for clarification and format).
Every tablet has its own in-memory store for inserts (MemRowSet). From a read/write path perspective, every tablet is an entirely independent entity, with its own MemRowSet, rowsets, etc. Basically the flow is:
- The client wants to make a write (i.e. an insert/update/delete), which has a primary key.
- The client applies the partitioning algorithm to determine which tablet that key belongs in.
- The information about which tablets cover which key ranges (or hash buckets) is held in the master. (But since it is cached by the clients, this is usually a local operation.)
- It sends the operation to the “leader” replica of the correct tablet (batched along with any other writes that are targeted to the same tablet).
- Once the write reaches the tablet leader:
- The leader enqueues the write to its own WAL (Write-Ahead Log) and also enqueues it to be sent to the “follower” replicas.
- Once it has reached a majority of the WALs (i.e. 2/3 when the replication factor = 3), the write is considered “replicated”. That is to say, it’s durable and would always be rolled forward, even if the leader crashed at this point.
- Only now do we enter the “storage” part of the system, where we start worrying about MemRowSets vs DeltaMemStores, etc.
Put another way, there is a fairly clean architectural separation into three main subsystems:
- Metadata and partitioning (map from a primary key to a tablet, figure out which servers host that tablet).
- Consensus replication (given a write operation, ensure that it is durably logged and replicated to a majority of nodes, so that even if we crash, everyone will agree whether it should be applied or not).
- Tablet storage (now that we’ve decided a write is agreed upon across replicas, actually apply it to the database storage).
These three areas of the code are separated as much as possible — for example, once we’re in the “tablet storage” code, it has no idea that there might be other tablets. Similarly, the replication and partitioning code don’t know much anything about MemRowSets, etc – that’s entirely within the tablet layer.
As for reading — the challenge isn’t in the actual retrieval of the data so much as in figuring out where to retrieve it from. What I mean by that is:
- Data will always be either in memory or in a persistent column store. So I/O speed will rarely be a problem.
- Rather, the challenge to Kudu’s data retrieval architecture is finding the relevant record(s) in the first place, which is slightly more complicated than in some other systems. For upon being told the requested primary key, Kudu still has to:
- Find the correct tablet(s).
- Find the record(s) on the (rather large) tablet(s).
- Check various in-memory stores as well.
The “check in multiple places” problem doesn’t seem to be of much concern, because:
- All that needs to be checked is the primary key column.
- The on-disk data is front-ended by Bloom filters.
- The cases in which a Bloom filter returns a false positive are generally the same busy ones where the key column is likely to be cached in RAM.
- Cloudera just assumes that checking a few different stores in RAM isn’t going to be a major performance issue.
When it comes to searching the tablets themselves:
- Kudu tablets feature data skipping among DiskRowSets, based on value ranges for the primary key.
- The whole point of compaction is to make the data skipping effective.
Finally, Kudu pays a write-time (or compaction-time) cost to boost retrieval speeds from inside a particular DiskRowSet, by creating something that Todd called an “ordinal index” but agreed with me would be better called something like “ordinal offset” or “offset index”. Whatever it’s called, it’s an index that tells you the number of rows you would need to scan before getting the one you want, thus allowing you to retrieve (except for the cost of an index probe) at array speeds.