Facebook Twitter Linkedin
article | 03 Aug 2022
Apache Cassandra decomposed into prime factors
Dima Pleczko
ss

How apache cassandra works

In this topic, I would like to talk about how cassandra, a decentralized, fault-tolerant, and reliable key-value database, works.

 

The repository itself takes care of the problems of a single point of failure, server failures, and data distribution between cluster nodes. And both in the case of placement of servers in one data center, as well as in the configuration with many data centers, separated by distances and, accordingly, network latencies. Reliability refers to the eventual consistency of the data with the ability to set a tune consistency level for each query. 

NoSQL databases generally require more insight into their inner workings than SQL databases. This article will describe the basic structure, and the following articles will discuss: CQL and the programming interface; design and optimization techniques; features of clusters deployed in multiple data centers.

 

Data model

An application works with a keyspace, which corresponds to the notion of a database schema in a relational model. This keyspace can contain several column families, which corresponds to the notion of a relational table. In turn, column families contain columns combined with a row key into a record. A column consists of three parts: column name, timestamp, and value. Columns are ordered.

 

Data model

In contrast to the relational database, there are no restrictions on records (and in terms of the database, they are rows) to contain columns with the same names as in other records. Column families can be of several kinds, but we will omit that detail in this article. Also, in the latest versions of cassandra, the ability to perform queries to define and change data (DDL, DML) using CQL language and create secondary indices (secondary indices).

The specific value stored in the cassandra is identified:

  • The essential space is binding to the application (subject area). It allows to place data of different applications on one cluster;
  • A column family is binding to the query;
  • by a key, which is binding to a cluster node. The key determines which nodes the saved columns fall on;
  • the column name is binding to an attribute in a record. Allows you to store multiple values in one record.

Linked to each value is a timestamp, a user-defined number that is used to resolve conflicts during recording: the higher the number, the newer the column is considered, and it rubs out old columns during comparisons.

By data types: keyspace and column family are strings (names); a timestamp is a 64-bit number; and key, column name, and column value are an array of bytes. Cassandra also has a concept of data types. These types can optionally be specified when creating a column family. For column names, this is called comparator; for values and keys, it's called validator. The first determines which byte values are valid for column names and how to order them. The second determines which byte values are valid for column and key values. If these data types are not specified, the cassandra stores the values and compares them as byte strings (BytesType) since, in effect, they are stored internally.

The data types are as follows:

  • BytesType: any byte string (no validation)
  • AsciiType: ASCII string
  • UTF8Type: UTF-8 string
  • IntegerType: any number with any size
  • Int32Type: 4-byte number
  • LongType: 8-byte integer
  • UUIDType: a UUID of type 1 or 4
  • TimeUIDType: UUID of type 1
  • DateType: 8-byte timestamp value
  • BooleanType: two values: true = 1 or false = 0
  • FloatType: 4-byte floating point number
  • DoubleType: 8-byte floating point number
  • DecimalType: floating point, random number
  • CounterColumnType: 8-byte counter

In cassandra, all data write operations are always overwrite operations. That is, if a column with the same key and name already exists in the column family and the timestamp is larger than the one stored, the value is overwritten. The written values never change; newer columns come in with new values.

Writing to cassandra works at a faster rate than reading. This changes the approach that is taken in the design.

 

Suppose you look at cassandra from a data model design perspective. In that case, it's easier to think of the column family not as a table but as a materialized view - a structure that represents the data of some complex query but stores it on disk. Instead of trying to aggregate the data somehow using queries, it is better to try to store everything you might need for this query into a column family. That is, it is not the relationships between entities or relationships between objects that should be approached, but rather the queries: which fields should be selected; in what order the records should go; which data related to the main ones should be queried together - all this should already be stored in a column-family. The number of columns in a record is theoretically limited to 2 billion. This is a short digression; you can find more details in the design and optimization techniques article. Now let's go deeper into the process of saving data to cassandra and reading them.

 

Data Distribution

Let's see how data are distributed in cluster nodes depending on the key. Cassandra allows you to specify the data distribution strategy.
 

Data Distribution

The first strategy distributes data depending on the md5 value of the key (random partitioner). The second considers the bitwise representation of the key itself - byte-ordered partitioner. The first strategy, in most cases, gives more advantages because you do not have to worry about even data distribution between servers and similar problems. The second strategy is rarely used when interval requests (range scan) are needed. It is important to note that this strategy is chosen before the cluster is created and, in fact, cannot be changed without a complete reboot of the data.

Cassandra uses a technique known as consistent hashing to distribute the data.

 

This approach distributes the data between nodes and ensures that when a new node is added or removed, the amount of data forwarded is small. To do this, each node is assigned a token that parses the set of all md5 key values. Since RandomPartitioner is used in most cases, let's take a look at it. As I said before, RandomPartitioner calculates a 128-bit md5 for each key. To determine in which nodes the data will be stored, it simply goes through all node labels from the smallest to the largest, and, when the label value becomes more significant than the key md5 value, that node, together with some subsequent nodes (in label order) is selected for storage. The total number of selected nodes must be equal to the replication factor. The replication level is set for each keyspace and allows for regulating data redundancy. Before you add a node to the cluster, you must set a label for it. The percentage of keys covering the gap between this label and the next depends on how much data will be stored on the node. The whole set of labels for a cluster is called a ring.

 

Data Consistency

Cassandra cluster nodes are peer-to-peer, and clients can connect to any of them for writing or reading. Requests go through a coordination stage, during which the server sends requests to the nodes where the data should be located using a key and a partitioner.

 

data consistency

We will call the node that performs the coordination as coordinator and the nodes that are selected to store the record with the given key as replica nodes. Physically, the coordinator can be one of the replica nodes - it depends only on the key, the marker, and the labels.

For each request, both read and write. It is possible to set a data consistency level. For writes, this level will affect the number of replica nodes from which to wait for confirmation of successful completion (data written) before returning control to the user.

For writes, there are these consistency levels:

  • ONE - the coordinator sends requests to all replica nodes, but after waiting for confirmation from the first node, returns control to the user;
  • TWO - the same, but the coordinator waits for confirmation from the first two nodes before returning control;
  • THREE - same, but the coordinator waits for confirmation from the first three nodes before returning control;
  • QUORUM - the quorum is collected: the coordinator waits for confirmation of the record from more than half of the replica nodes, namely round(N / 2) + 1, where N is the replication level;
  • LOCAL_QUORUM - the coordinator waits for the acknowledgment from more than half of the replica nodes in the same data center where the coordinator is located (potentially different for each query). Allows you to eliminate delays associated with forwarding data to other data centers. The issues of working with many data centers are discussed in this article in passing;
  • EACH_QUORUM - the coordinator waits for acknowledgment from more than half of the replica nodes in each data center independently;
  • ALL - the coordinator waits for acknowledgment from all the replica nodes;
  • ANY - allows you to write data even if all the replica nodes do not respond. The coordinator waits either for the first response from one of the replica nodes or for the data to be saved by a hinted handoff at the coordinator.

 

For reads, the consistency level will affect the number of replica nodes to be read from. For reads, there are these consistency levels:

  • ONE - the coordinator sends requests to the nearest replica node. The rest of the replicas are also read for reading repair with the probability specified in the cassandra configuration;
  • TWO is the same, but the coordinator sends requests to the two nearest nodes. The value that has a larger timestamp is selected;
  • THREE - similar to the previous one, but with three nodes;
  • QUORUM - the quorum is gathered, i.e., the coordinator sends queries to more than half of the replication nodes, namely round(N / 2) + 1, where N is the replication level;
  • LOCAL_QUORUM - the quorum is collected in the data center where the coordination takes place, and data with the last timestamp is returned;
  • EACH_QUORUM - the coordinator returns data after the quorum is collected in each of the data centers;
  • ALL - the coordinator returns information after reading from all replica nodes.

 

Thus, it is possible to adjust the time delays of reading and writing operations and tuning the consistency and availability of each type of operation. In fact, availability is directly related to the level of consistency of read and write operations, as it determines how many replica nodes can fail and still have those operations confirmed. If the number of nodes from which write acknowledgments come is greater than the number of nodes from which reads reached, then we have a guarantee that once written, the new value will always be read, which is called strong consistency. Without solid consistency, the read operation may return obsolete data.

 

The value will propagate between replicas after the coordination expectation is over. This propagation is called eventual consistency. If not all replica nodes will be available during writing, then sooner or later recovery means, such as read with repair and anti-entropy node repair. More on this later.

 

So, with a QUORUM level of consistency on reads and writes, there will always be strict consistency, and it will be some kind of balance between read and write operation latency. There will be strict consistency for ALL writes and ONE reads, and read operations will be faster and have more availability, meaning that the number of failed nodes at which reads will still be performed may be greater than at QUORUM. On the other hand, write operations will require all working replica nodes. With ONE write, ALL reads will also be strictly consistent, write operations will be faster, and write availability will be high because it will be enough to confirm that a write operation took place on at least one of the servers. In contrast, a read will be slower and require all nodes-replicates. If there is no strict consistency requirement for the application, it is possible to speed up both read and write operations and improve availability by setting lower consistency levels.

 

Data recovery

Data recovery

Cassandra supports three data recovery mechanisms:

  • Read repair - during reads, data is queried from all replicas and compared after coordination is complete. The column with the latest timestamp will propagate to the nodes where the timestamps are obsolete.
  • The directional send (hinted handoff) - allows you to save the information about the write operation on the coordinator in case the write to any of the nodes fails. Later, when it is possible, the recording will be repeated. It allows to quickly perform data recovery in case of a short-term absence of a node in the cluster. In addition, with ANY consistency level, it will enable absolute write availability; when all replica nodes are unreachable, the write operation is confirmed, and the data will be kept on the coordinator node.
  • Anti-entropy node repair is some process of restoring all replicas, which should be started manually regularly using the "nodetool repair" command. It allows maintaining the number of replicas of all data, which may not have been restored by the first two methods, at the required replication level.

 

Writing to disk

When data comes after coordination to a node directly for writing, it goes into two data structures: a table in memory (memtable) and a commit log. An in-memory table exists for each column family and instantly allows the value to be remembered. It is a hashmap with concurrent access based on a data structure called a "skip list." The log is one for the entire critical space and is stored on a disk. The log is a sequence of modification operations.

 

Writing to disk

In the same way, it is divided into parts when it reaches a specific size. Such organization allows limits the write speed to the speed of consecutive writes to the hard disk and, at the same time, guarantees data durability. In case of a node crash, the fixture log is read at the start of the cassandra service and restores all tables in memory. It turns out that the speed is limited by sequential writing to disk, and it is about 100MB/sec for modern hard drives. For this reason, putting the fixing journal on a separate disk drive is advised.

It is clear that sooner or later, the memory may fill up. Therefore, the table in memory should also be saved to disk. To determine when to keep, there is a limit to the size of the tables occupied in memory (memtable_total_spacein_mb). By default, it is ⅓ the maximum size of the Java heap space. When tables in memory fill more than this limit, cassandra creates a new table and writes the old table in memory to disk as a saved table (SSTable). The reserved table is immutable once it is created and is never modified again. When saving to disk, parts of the SSTable log are marked as free, thereby freeing up disk space that is occupied by the register. Note that the log has an intertwined structure of data from different column families in the key space, and some parts may not be freed because some areas will correspond to other data still in the tables in memory.

 

As a result, each column family corresponds to one table in memory and some number of saved tables. When a node processes a read request, it needs to query all these structures and select the most recent timestamp value.

 

There are three mechanisms to speed up this process: bloom filter, key cache, and record cache:

  • A bloom filter is a data structure that takes a bit of space and allows us to answer the question: of whether an element, in our case, a key, is contained in the set or not. Moreover, if the answer is "no," it is 100%, and if the answer is "yes," it is probably a false positive. This reduces the number of reads from saved tables;
  • the key cache stores the disk record position for each key, thus reducing the number of positioning operations (seek operations) during searches on the stored table;
  • record cache saves the whole record, allowing to get rid of disk reading operations altogether.

 

Compaction

At some point in time, the data in the column family will be overwritten - the columns will come with the same name and key. A situation will arise where an older saved table and a newer one will contain old and new data. To guarantee integrity, the cassandra must read all these saved tables and select the data with the latest timestamp.

 

It turns out that the number of hard disk positioning operations when reading is proportional to the number of saved tables. Therefore there is a compaction process to free overwritten data and reduce the number of saved tables. It reads several saved tables sequentially and writes a new saved table that combines data by timestamp. When a table is wholly written and put into use, the compaction process can release the source tables (the tables that formed it). Thus, if the tables contained overwritten data, this redundancy is eliminated. It is clear that during such an operation, the amount of redundancy increases - the new saved table exists on the disk along with the source tables, which means that the amount of disk space must always be such that compaction can be performed.

 

Cassandra allows you to choose one of two strategies for performing compaction:

  • Size-tiered compaction (size-tiered compaction) - this strategy compacts the two tables selected in a particular way. Applies automatically as background compaction (minor compaction) and in manual mode for full compaction (major compaction). It allows finding the key in many tables and requires a search operation for each such table.
  • Leveled compaction (leveled compaction) is a strategy for compacting the saved tables, which are initially created small (5 MB), by grouping them into levels. Each level is ten times larger than the previous one. Moreover, there are such guarantees: 90% of read requests will go to one saved table, and only 10% of disk space will be used for obsolete data. In this case, only 10 times the table size, i.e., 50 Mb, is enough to perform compaction for a temporary table. Read more in this article.

 

Deletion operations

Internally, column deletion operations are operations of writing a particular value - a tombstone value. When such a value results from reading, it is skipped, as if such a value never existed. As a result of compaction, such values gradually displace the obsolete absolute values and possibly disappear altogether. If, however, columns of real data with even newer timestamps appear, they will eventually outlast these mashing values as well.

 

Transactivity

Cassandra supports transactionality at the single record level, that is, for a set of columns with a single key. Here's how the four ACID requirements are met:

  • Atomicity - all columns in a single record in a single operation will either be written or not;
  • consistency - as mentioned above, it's possible to use queries with strict consistency instead of availability and thereby meet this requirement;
  • isolation - since cassandra version 1.1, there has been support for isolation, when while recording columns from one record, another user reading the same record will either see the complete old version of the record or, after the operation, the new version, but not some columns from one and some from the other;
  • durability is ensured by the existence of a durability log, which will be reproduced and restore the node to the desired state in case of failure.

 

If you want to explore more about Apache Cassandra, check our articles:

  1. What is Apache Cassandra?
  2. Apache Cassandra vs. MongoDB
  3. 10 Apache Cassandra use cases in 5 Big Data directions