So in this lecture, we'll be looking at the design details of Apache Cassandra.
So Apache Cassandra is a distributed key-value store intended to
run in a data center and also across multiple data centers.
Originally it designed as Facebook as an infrastructure for their messaging platform.
However later, they decided to open source it.
And today it's one of the most active Apache projects.
A lot of companies use Apache Cassandra in
their production clusters for a variety of different applications.
These include blue chip companies such as IBM,
Adobe, Hewlett-Packard, eBay, Ericsson, Symantec.
Also newer companies such as Twitter and Spotify use Cassandra.
Nonprofits such as PBS Kids use it.
And if you've ever used Netflix,
you've actually indirectly used Cassandra.
Essentially, Netflix uses Cassandra to keep track of positions in the video.
So, wherever you are in a particular video is kept track of in Cassandra.
So that, if you pause the video,
or if you stop it and then later resume,
the Cassandra key-value store is used to fetch your latest scene position.
So let's go inside the design of Cassandra itself.
So the first question that has to be
addressed about the design is given a key-value pair,
how do you map the key to a server?
How do you know which server stores that particular key-value pair?
Well, it turns out that because key-value stores are similar to distributed hash tables,
Cassandra uses the ring that we saw in distributed hash table.
So you might remember this one from when we discussed peer-to-peer systems.
Essentially, Cassandra places servers in a virtual ring.
Again, the ring here consists of two power endpoints.
And here, it's seven, so it's 120 endpoints.
And the server is placed to the point in the ring.
And essentially, then, you map the keys and the points in the ring.
And the key get stored at servers that are successors to its point on the ring.
So for instance, key with an ID of 13,
which maps for 13, would get stored at servers 16,
and also 32, and also 45.
One of these might be the primary replica.
One of these, the others might be backup replicas.
As far as Cassandra is concerned,
it doesn't necessarily need to have primary backup replicas,
or need to know the initial primary or backup replicas,
it just knows them as replicas.
The client sends its queries to one of the servers in the system.
It need not be one of the replicas,
the server is called the coordinator.
And again, the coordinator need not be on a per data center basis.
It could be on a per client basis,
on a per query basis, it doesn't really matter.
There's also one ring present per data center.
So if you have Cassandra running in two different data centers,
each of those data centers would be using
a separate ring with its own servers mapped to that ring.
What is not present in Cassandra which
was present in a distributed hash table like Chord,
when we discussed it, is the notion of finger tables, or routing tables.
So there's no routing used in Cassandra here.
Instead, when the client sends a query to the coordinator,
the coordinator simply forwards a query
to the appropriate replicas or just some of the replicas,
and for that particular key.
This means that every server, which could be the coordinator,
needs to know about where the keys are stored on every other server in the system.
We'll come back to this later on,
we'll discuss later on how servers know about each other.
But for now, let's focus on the key to server mapping.
So the mapping from key to server is called the
"Partitioner" and that is what is used by the coordinator
here to find out which are the replica servers to
forward a particular query for Key13 to.
So, there are different kinds of replication strategies.
There are two main classes that are supported by Cassandra.
They are the SimpleStrategy and the NetworkTopologyStrategy.
The SimpleStrategy uses the partitioner,
and there are two kinds of partitioners.
The RandomPartioner uses hash-based partitioning just like Chord.
So essentially, this is the version of Cassandra that is most
similar to Chord where essentially keys are hashed to a point in the ring.
And then are stored at servers that are close to that point of the ring,
or are assigned to that segment in the ring itself.
So for instance, if you assign to each server,
the segment in the ring that is between that servers point in the ring,
and its predecessor, then this becomes more similar to the Chord hash-based partitioning.
There's also ByteOrderedPartitioner, which assigns ranges of keys to servers.
You may want to maintain a table in
Cassandra that preserves the ordering of the keys in there.
For instance, the keys might be timestamps,
and you might want to maintain the ordering of the timestamps.
And this does not hash the keys,
instead it simply maps the key on to a point in
the ring based on the value of the key itself.
So that two keys that are in order in
the real space are also in the same order in the ring space itself.
This is useful for range query.
For instance, if you want to get all the Twitter users starting with A or B,
then you can do a range query very easily,
if you're using a ByteOrderedPartitioner by simply searching
for the servers that will be storing that particular range of keys.
If you are using the hash-based partitioning instead,
you'd essentially have to ask every single server in
that particular ring to see if the server has any key-value,
key-value pairs that match this particular range.
The NetworkTopologyStrategy is a different strategy
which is used for multi-data center deployments.
It supports a variety of different configurations.
It supports one configuration where you can have
two replicas of each key per data center.
So if you have three data centers,
then you essentially have six replicas to each data center for each key.
There's also different configurations,
with just three replicas per data center.
Now, how do you select replicas within each data center?
So on a per data center basis,
you first place the first replica according to the partitioner.
Again, you can use a RandomPartitioner,
or a ByteOrderedPartitioner here.
And then for the next replica, you want to make sure that you are
storing that second replica on a different rack.
Why? Well, you don't want a rack failure within
the data center to lose all copies of a given key.
So you place the second copy of a key on a different rack.
Essentially, you go around the ring clockwise,
until you encountered the first server that is in a different rack from the primary,
the first replica that you placed.
And you place your key at that first server.
So this ensures that there is rack fault tolerance,
and it also ensures that there are two replicas in
this particular case for that key for the data center.
So, Cassandra uses a mechanism known as Snitches to
map IP addresses to racks and data centers.
You can configure this in the cassandra.yaml configuration file.
Cassandra offers a variety of Snitch options and I'm
going to discuss a few of those options here.
And the first option is a SimpleSnitch,
which is unaware of the topology.
In other words, your application really cannot know
much about which IP addresses matter to which racks and which data centers.
In some cases, this works for instance,
if you're running only one VM,
then you don't really want to Snitch and SimpleSnitch is fine.
The second Snitch is the RackInferringSnitch which tries to guess from
the IP address what the rack and the data center might be.
Essentially, the IP address breaks up into four octets.
And the first octet is ignored.
The second octet is used to map to the data center.
The third is used to map the rack,
and the fourth is used to map to the node.
So for instance, if I have an IP address that goes
101.102.103.104 then essentially 102 is the data center octet.
So any other node that,
any other IP address that starts with 101.102 will
belong to the same data center as this particular IP address given over here.
The third octet is the rack octet.
So any other IP address that starts with 101.102.103,
will belong to the same data center and rack as this IP address given here.
And finally, the last octet 104, identifies the node.
So, any other IP address that has exactly the same four octets shown over here,
will be the same identical address namely with reference to
the same IP address and server or VM as this particular IP address shown over here.
And this of course is a best effort guess
because IP addresses may not map actually to rack from data centers as shown over here.
But not knowing anything else,
this is a pretty reasonable guess.
You can also, if you want to be really accurate
about which IP addresses matter, which racks and data centers,
if you actually know this information,
you can create this in a property file and this is called a PropertyFileSnitch.
And you write this into a configuration file.
Finally, if you're running your Cassandra on EC2,
EC2 of course uses regions and within regions that are availability zones.
So the EC2 Snitch in Cassandra allows you to guess the rack and data center
as the availability zone and the EC2 region respectively.
And you can use this information then to do the Cassandra mapping.
For instance, replication and also within
the data center and across racks as we discussed previously,
and also across data centers.
There are a variety of other Snitch options available as well in Cassandra.
If you're interested in this, please look at the Cassandra web pages.
So how are writes and reads supported?
So, remember that when a client sends a write to a coordinator,
the coordinator needs to forward the write to one or more replicas for
that particular key that is present in the query and the write itself.
Writes need to be lock-free because we're dealing with write heavy workloads here,
and they need to be fast.
So you want to incur few disk reads are seeks.
Preferably, no disk access at all for the write in the critical write path.
So the client sends the write to one coordinator node in the Cassandra cluster.
The coordinator again, maybe per-key, per-client, or per-query.
If you have a per-key coordinator,
this ensures that all the writes to
that particular key are serialized why that coordinator.
But again, this is not a requirement for Cassandra.
The coordinator when it receives this write,
uses the partitioner to find out which of the replicas that map to that particular key.
It then, sends the query to all the replica nodes responsible for that particular key.
Then, some of the replicas respond when X replicas respond,
the coordinator returns an acknowledgement to the client that the write is done.
What is this value of X? We'll see that later.
For now, let's just assume that X is
some small number specified by the client that is smaller,
less than, or equal to the number of replicas for that particular key.
So, keys always need to be writable.
So you want to make sure that even if you have failures,
the writes succeed, and return an acknowledgment to the client.
So for this Cassandra use is what is known as a Hinted Handoff mechanism.
This works as follows, if any replica is down,
the coordinator writes to all the other replicas,
but it also keeps a copy of the write locally,
and when that down replica comes back up,
it is then sent a copy of that write.
This ensures that replicas when they fail and then recover,
they receive from the coordinator some of the latest writes.
However, all the replicas might be down, it's possible.
It may be unlikely but it's possible that you have three replicas for the key,
and when a write comes along,
all the three replicas happened to have failed.
In this case, instead of rejecting the write,
the coordinator stores the write locally at itself. It buffers the writes.
And then, when one or more of the replicas comes back up,
it then release the writes to the corresponding replicas.
The coordinator of course has a time on how long it stores these writes.
Otherwise, it may be storing writes forever.
And this time on is typically a few hours which is good
enough for replicas to recover and rejoin the system.
Again, this mechanism is known as Hinted Handoff
because the coordinator based on the hints that it receives from
the failure information it might
assume the ownership of that key for just a temporary duration of time.
So, if you have multiple data centers running, Cassandra,
you maintain one ring per data center,
and you might also use a per data center coordinator.
This coordinator is different from the coordinator used by client queries.
The per data center coordinator coordinates with the
other per data center coordinators to make sure
that the data center to data center coordination is done in a correct manner.
The election of the per data center coordinator is done by a Zookeeper,
which is a system again an open source Apache system which runs a variant of
Paxos and we'll see Paxos later and elsewhere in this course.
So, what does a replica server do when the coordinator forwards it write?
Well, the first thing it does is that it logs information about the write.
Just a little bit of information in a comment log that is present on disk.
This is used for failure recovery,
so that if the replica server fails at any point of time here on,
it can know by looking at the disk log after it recovers that there were
some write that it completed only partially
and then it can ask the coordinator for information about this write.
After this, the replica server updates memtable.
A memtable has an in-memory representation of multiple key value pairs.
Essentially, memtable maintains some of the latest key value pairs that
have been written at this particular replica server.
It's a cache that can be searched by key so you
quickly search the particular key that you're trying to write.
If it's present on the memtable you update the corresponding value,
if it's not present in the memtable then you
simply append to the memtable another key value pair.
This memtable is called Write-back cache meaning that you are storing it
temporarily and tentatively in memory rather than writing it directly to disk.
If you were writing it directly to disk then it would be called write-through cache,
and write-through caches that are much slower than write-back caches.
However, the memtable has a maximum size and when this maximum size is reached,or
when the memtable is very old then it is flushed to disk.
Essentially, you create an SSTable or
a Sorted String Table since you take the memtable and you
sort all the keys within them so that
the keys are sorted and the values are present alongside the keys as well,
and then you store it on disk as an SSTable.
Now, if you want to search for a particular key in the data file itself,
in this data file that is the key value pairs it might take a long time
because it consists of both the keys and
the values so you maintain an index file as well,
which stores an SSTable of (key,
position in data SSTable) pairs so that you can quickly look up the position of
a particular key in the data SSTable by looking at the index file SSTable itself.
However, in most of the cases you might be checking
for existence of a key in SSTable and the key is just not there.
In this case, doing a binary search through the index file to look for
that key would just result in a very large overhead.
If you have a large number of SSTables essentially,
each SSTable containing M entries then you're
going to incur an order log M overhead
per SSTable in trying to look for a particular key.
And so what you do is you add a Bloom filter,
which is a quick way of looking for whether
or not a particular key is present in an SSTable.
So a Bloom filter is a well-known data structure which Cassandra uses.
So what is a Bloom filter? And what does it look like?
Well, here's a Bloom filter, an example,
a Bloom filter is a compact way of representing a set of items so that
the most common operation which is checking for existence in
that set becomes very cheap and becomes very low overhead.
The most common operation for Bloom filter is you say well,
is this particular key present in this particular set or not?
The other operation that the Bloom filter supports is
inserting an item into the Bloom filter.
The Bloom filter has some probability of false positives,
an item not in the set may
be returned as true as being in the set and for the Cassandra example,
this just results in a little bit of extra overhead when you look through
the index file and then subsequently do not find that item.
But you never have false negatives,
which means that if you've inserted an item in the set you
would always return true when that item is checked for the membership.
So, how does a Bloom filter work?
Well, a Bloom filter is essentially a very large bitmap.
Here's an example of a Bloom filter,
which has 128 bits shown as numbers 0-127.
The bits are all, each of them has a number as an ID.
Initially, all the bits are zero so the entire Bloom filter is zero.
When you have a key say K you use a small set of hash functions one through small K to
hash that Key-K. Each of these hashes returns a number between 0-127 both ends inclusive.
For instance, hash1 applied on the Key-K returns a value 1.
If you're trying to insert the Key-K,
you're going to set that bit to be one over here.
Again, hash2 when applied to the Key-K,
returns a value of 69,
and so if you are inserting the Key-K,
you would set that bit number 69 to be a 1.
Similarly, Hashk returns a of value of 111 and you set that bit to be one.
So whenever you want to insert a particular key you set
all the corresponding small key Hash values to be one's.
If you insert a second key you would go ahead and set those corresponding bits that,
that particular key maps to as one.
If any of those bits is already one you leave it as one.
So you can see that over time as many keys are inserted into the system
a small set of bits in this particular Bloom filter are are set to one.
Now, what happens when you want to check if a particular key is
present in the Bloom filter or not.
Well, once again you do the same thing you take the key,
you hash it using the K hash functions and then
check if all of these hash2 bits are all one's.
If any of them is a zero then you return false
saying this particular key is not present in the Bloom filter.
And this is the correct answer because if that key had been
inserted in the Bloom filter all those bits would have been one.
If all those hash2 bits of this Key-K are set then you return a true saying that,
that key is present in the Bloom filter.
Well, this is not guaranteed to be correct.
However, it is correct with a high probability.
However, it's possible that this key was
never inserted into the Bloom filter and that half of
these bits were set by one other key and
then the other half of the bits were set by a different key.
And so you might end up returning an answer of true for
membership even though that key was never inserted into the system.
This is what is known as a false positive.
However, the false positive rates can be tuned on to be very low.
For instance, if you use 4 hash functions 100 items inserted into
a Bloom filter with 3,200 bits that's just 3.2 kilobits.
The false positive rate is as low as 0.02%.
So 0.02% or 0.0002 of
the membership checks will return an answer of 2 when the real answer is false.
And you can tune the false positive rate to be much lower for instance,
increasing the number of bits that are present in the Bloom filter itself.