Sorting in Cassandra
The Cassandra distributed data store is, at heart, a key/value storage system. Unlike pure key/value storage systems, however, Cassandra implements first-class notions of key and column ordering. Dominic Williams covered a good chunk of the topic wonderfully in his post “Cassandra: RandomPartitioner vs OrderPreservingPartitioner”. But his framing of the question is a bit different than mine and was somewhat dominated by his focus on full-text search indexing.
You’ve got options
There are three ways to get Cassandra to order your data for you, each with its own costs and benefits.
- Use the OrderPreservingPartitioner and order rows by key.For example, given a log file structure, I could have a KeySpace named “Logger”, a Column Family named “Events”, and a list of events keyed by some form of timestamp (probably TimeUUID — more on this later).This technique aligns the physical structure of the database with your sort order.
- Use sortable column names and order columns within a row by the column name.In this case my Keyspace and Column family would be the same, but I would probably have a single row Key which I might call “EventsColumns” within the “Events” Column family. This row would have columns which are timestamps (actually TimeUUIDs).This technique stores all of the sorted items in a single row, which lives on a single machine, and thus is limited to the capabilities of that machine.
- Hack sorting on top of a random set of keys by using techniques like creating keys that are date named (find the most recent one by working backwards from today) or by partitioning a list of items into multiple rows.
Each of these strategies has its own strengths and weaknesses.
Sorting by Key
Sorting by key is enabled at the top level of the the Cassandra conf/storage-conf.xml file. You specify the OrderPreservingPartitioner which informs Cassandra to sort the physical layout of the data according to the sort order of the keys. Typically this is done in a byte-oriented way, but you can customize Cassandra to sort however you like by creating your own OrderPreservingPartitioner.
The properties (good and bad) of the sort-by-key strategy emerge from the fact that all of Cassandra’s replication and sharing features are based upon key ranges. Even the files-on-disk are sorted by key.
Aligning the storage order with our sorting requirements gives us the following benefits:
- “Key range queries” allow us to fetch a pre-sorted slice of the key space based on a predicate. For example, a date range for date-keyed data, or an alphabetic range for word-keyed data.
- Using pagination techniques, it is possible to walk through the entire universe of keys (or any subset) forwards or backwards. With the RandomPartitioner, you would either need to walk through in a random order (as might be appropriate in a MapReduce job) or you need to have some external means for knowing the order of the keys.
On the other hand, using the same ordering for storage and querying has some serious side effects that you must plan for. For starters, the Cassandra wiki documentation says:
With OrderPreservingPartitioner the keys themselves are used to place on the ring. One of the potential drawbacks of this approach is that if rows are inserted with sequential keys, all the write load will go to the same node.
This is probably not strictly true, because more than one node may be responsible for the node representing the “end” of the list of data, but it’s approximately true: the load will certainly not be shared by nodes representing the beginning or middle of the data.
This becomes even more problematic when you realize that the partitioner applies to all Column families in all Keyspaces. It is an unfortunate design consequence (flaw?) of Cassandra that you cannot set the partitioner by Column family or Keyspace. You must choose globally.
If you really need random partitioning for some column families and ordered for others, then you should do the randomization yourself by prepending some kind of hash (e.g. MD5). This is similar to what Cassandra would do automatically for you with a RandomPartitioner in any case. It’s just somewhat ugly to write client code to randomize your own content in order to work around a flaw in the datastore.
This flaw is exacerbated by three further facts:
- if you design a partitioner that partitions according to IEEE floating point comparison rules (as an extreme example), then you’ll end up with some really weird sorting of keys that are not really intended to be interpreted as numbers. You can work around that by converting everything to a uniform structure (typically just ordering by lexicographic byte order).
- imagine that all of your keys in a particular column family are URLs. This column family will tend to congregate on machines that manage the range of (“http://…”). If you do not care about the sorting of these keys then you can MD5-prepend them before insertion as described above. If you DO care, then you’re kind of out of luck. Now you have “hot” machines for (“http://…”) in one part of your cluster and hot machines for “today’s date” in another part of your cluster. Furthermore, the “hotness” could be in EITHER data size OR read load OR write load OR any subset of the three. (thanks to Dominic Williams for clarifying these aspects for me)
- you cannot change partitioners on a live cluster without exporting and reloading data. The Cassandra wiki says: “Achtung! Changing this parameter requires wiping your data directories, since the partitioner can modify the !sstable on-disk format.” You must get this decision right at the very beginning of your design process. In this sense, Cassandra is not very “agile”.
On the other hand, running a second “cluster” of Cassandra does not necessarily imply running twice as many machines. A cluster could be nothing more than some more directories on each machine and a second JVM instance.
Even so, the OrderPreserveringPartitioner has enough side effects that you may be hoping that the next alternative will be a magic bullet. Unfortunately it is not.
Sorting by Column Name
Cassandra always stores column names in order. So you get a certain amount of sorting “for free”.
You configure this sorting by turning on a comparator for a particular column family using the compareWith attribute in your conf/storage-conf.xml.
<ColumnFamily CompareWith="BytesType" Name="RawData"/> <ColumnFamily CompareWith="UTF8Type" Name="UniodeData"/> <ColumnFamily CompareWith="TimeUUIDType" Name="SortedByTime"/>
The complete list of built-in comparators is:
- BytesType: Simple sort by byte value. No validation is performed.
- AsciiType: Like BytesType, but validates that the input can be parsed as US-ASCII.
- UTF8Type: A string encoded as UTF8
- LongType: A 64bit long
- LexicalUUIDType: A 128bit UUID, compared lexically (by byte value)
- TimeUUIDType: a 128bit version 1 UUID, compared by timestamp
The most interesting of these is the TimeUUIDType.
A “TimeUUID” is an old-fashioned form of UUID that is based on the time of day. TimeUUIDs are more appropriate for a distributed system than timestamps, because they are universally unique. Two different nodes in a cluster could generate the same timestamp, but if they are properly configured, they cannot generate the same TimeUUID.
Out of the box, Cassandra does not support TimeUUIDs for sort ordering in an OrderPreservingPartitioner. The timestamp encoded in a TimeUUID does not sort properly according to lexicographic rules. You could create your own Partitioner that parses them and partitions that way, but there is a simpler way. Generate keys by concatenating time stamps and UUIDs or time stamps and machine identifiers (whether mac addresses, or IP addresses or whatever).
Column Sort issues
Column sorting is great in that it happens for free, automatically, all of the time. But it isn’t appropriate for all jobs.
At this point I’m going to quote Dominic Williams, who covered this issue well:
… there is a really simple if brutal solution [to the OrderPreservingPartitioner issues]: simply store your index inside a single column family row as a series of columns. Since Cassandra can in principle cope with millions of columns, this is perfectly possible. Although it is true each index won’t be distributed across your whole cluster, the load will at the least be distributed across the nodes holding the replicas. If you use a typical replication factor (RF) of 3 the load associated with each index will be shared by 3 nodes etc.
In the vast majority of cases, this will be enough, and it will be sufficient that the rest of your data is properly balanced across your cluster.
But, I hear you saying, this is too brutal. Your index is too massive to fit on 3 nodes, is extremely hot and this just won’t work. You moved to Cassandra because you want your load distributed across your entire cluster. Period.
This is a perfectly reasonably point of view.
I’ll add some further caveats:
- while it is true that Cassandra can in principle cope with millions of columns, Cassandra 0.6 and earlier (latest versions as of this writing) cannot handle rows that grow larger than memory.
- if you have one or two keys on the a particular machine that each have millions of rows, and another machine randomly happens to lack any such keys, then your load will get unbalanced again.
Before we get to workarounds, I should reiterate Dominic’s point that many people do not have millions or tens of millions of ordered items to deal with, and therefore do not need these workarounds.
That being said, the basic structure of any workaround to these issues is to use the random partitioner as a substrate and build ordered lists on top.
For example, what if the log events for every hour of every day were stored in its own row? So the columns would be events, and the key would be “2010-04-10:9:00-10:00″ Using a random partitioner, these keys would naturally migrate to various servers. When you want to know what happened on “Saturday” you fetch 24 rows and you are off to the races.
Or here’s another option: what if the log events for a particular user or system on a particular day, were kept in a row specific to that user or system? As long as no particular user or system generated millions of events in a single hour, you’d never push row size limits.
Dominic also explains that for his full-text indexing use-case, he could split “big” keys automatically into two smaller keys. You might use some other column family to build an index on top of these chunks, although Dominic describes how that may be redundant for his specific use case.
Building on Dominic’s idea, you can imagine a Column Family that has a Row that contains a million rows and each row is a “pointer” to a row in a different Column Family that has a million log events. If a trillion rows is not enough for you then you can add one more layer of list and you’ll get enough room to reference a quintillion events. Each “chunk” (Row) of a million events would be managed by a single machine and its replicas.
Of course, once you start nesting and indirecting, you’re going to add latency. That said: three queries to drill down into a data set of a quintillion items is not bad. Even so, if you’d like to avoid the overhead, read Dominic’s explanation of why Cassandra’s data structure may make indexing unnecessary for some use-cases.
I’d appreciate your thoughts on the following issues:
- are the reputed problems with the OrderPreservingPartition overblown or exaggerated?
- are there any open source libraries for hiding the complexity of the workarounds from developers? Should there be?
- could any of the “workarounds” be incorporated into Cassandra and become “features” which might obsolete the OrderPreservingPartitioner? For example, could Cassandra automatically split rows that cross some site-defined threshold and somehow abstract above the split so that the developer need not see it reflected in the API-view of the data? I’m thinking of the way that memory managers and file systems present linear abstractions on top of the messy reality of disk blocks and fragmented memory.