Introduction
The Apache Cassandra architecture is designed to provide scalability, availability, and reliability to store massive amounts of data. If you are new to Cassandra, we recommend going through the high-level concepts covered in what is Cassandra before diving into the architecture.
This blog post aims to cover all the architecture components of Cassandra. After reading the post, you will have a basic understanding of the components. This can be used as a basis to learn about the Cassandra Data Model, to design your own Cassandra cluster, or simply for Cassandra knowledge.
Cluster Topology and Design
Cassandra is based on distributed system architecture. In its simplest form, Cassandra can be installed on a single machine or in a docker container, and it works well for basic testing. A single Cassandra instance is called a node. Cassandra supports horizontal scalability achieved by adding more than one node as a part of a Cassandra cluster. The scalability works with linear performance improvement if the resources are configured optimally.
Cassandra works with peer to peer architecture, with each node connected to all other nodes. Each Cassandra node performs all database operations and can serve client requests without the need for a master node. A Cassandra cluster does not have a single point of failure as a result of the peer-to-peer distributed architecture.
Nodes in a cluster communicate with each other for various purposes. There are various components used in this process:
- Seeds: Each node configures a list of seeds which is simply a list of other nodes. A seed node is used to bootstrap a node when it is first joining a cluster. A seed does not have any other specific purpose, and it is not a single point of failure. A node does not require a seed on subsequent restarts after bootstrap. It is recommended to use two to three seed nodes per Cassandra data center (data centers are explained below), and keep the seeds list uniform across all the nodes.
- Gossip: Gossip is the protocol used by Cassandra nodes for peer-to-peer communication. The gossip informs a node about the state of all other nodes. A node performs gossip with up to three other nodes every second. The gossip messages follow specific format and version numbers to make efficient communication.
A cluster is subdivided into racks and data centers. These terminologies are Cassandra’s representation of a real-world rack and data center. A physical rack is a group of bare-metal servers sharing resources like a network switch, power supply etc. In Cassandra, the nodes can be grouped in racks and data centers with snitch configuration. Ideally, the node placement should follow the node placement in actual data centers and racks. Data replication and placement depends on the rack and data center configuration.
A rack in Cassandra is used to hold a complete replica of data if there are enough replicas, and the configuration uses NetworkTopologyStrategy, which is explained later. This configuration allows Cassandra to survive a rack failure without losing a significant level of replication to perform optimally.
There are various scenarios to use multiple data centers in Cassandra. Few common scenarios are:
- Build a Cassandra cluster with geographically distinct data centers which cater to clients from distinct locations, e.g.a cluster with three data centers in US, EU, and APAC serving local clients with low latency.
- Separate Cassandra data centers which cater to distinct workloads using the same data, e.g. separate data centers to serve client requests and to run analytics jobs.
- Active disaster recovery by creating geographically distinct data centers, e.g. a cluster with data centers in each US AWS region to support disaster recovery.
Read More: Connecting to a Cassandra cluster using TLS/SSL
Database structures
Cassandra stores data in tables where each table is organized in rows and columns the same as any other database. Cassandra table was formerly referred to as column family. Tables are grouped in keyspaces. A keyspace could be used to group tables serving a similar purpose from a business perspective like all transactional tables, metadata tables, use information tables etc. Data replication is configured per keyspace in terms of replication factor per data center and the replication strategy. See the replication section for more details.
Each table has a defined primary key. The primary key is divided into partition key and clustering columns. The clustering columns are optional. There is no uniqueness constraint for any of the keys.
The partition key is used by Cassandra to index the data. All rows which share a common partition key make a single data partition which is the basic unit of data partitioning, storage, and retrieval in Cassandra.
Refer to Cassandra-data-partitioning for detailed information about this topic.
Partitioning
A partition key is converted to a token by a partitioner. There are various partitioner options available in Cassandra out of which Murmur3Partitioner is used by default. The tokens are signed integer values between -2^63 to +2^63-1, and this range is referred to as token range. Each Cassandra node owns a portion of this range and it primarily owns data corresponding to the range. A token is used to precisely locate the data among the nodes and on the data storage of the corresponding node.
It is evident that when there is only one node in a cluster, it owns the complete token range. As more nodes are added, the token range ownership is split between the nodes, and each node is aware of the range of all the other nodes.
Here is a simplified example to illustrate the token range assignment. If we consider there are only 100 tokens used for a Cassandra cluster with three nodes. Each node is assigned approximately 33 tokens like:
node1: 0-33 node2: 34-66 node3: 67-99.
If there are nodes added or removed, the token range distribution should be shuffled to suit the new topology. This process takes a lot of calculation and configuration change for each cluster operation.
Learn more about data architecture principles
Virtual nodes/Vnodes
To simplify the token calculation complexity and other token assignment difficulties, Cassandra uses the concept of virtual nodes referred to as Vnodes. A cluster is divided into a large number of virtual nodes for token assignment. Each physical node is assigned an equal number of virtual nodes. In our previous example, if each node is assigned three Vnodes and each Vnode 11 tokens:
v1:0-9, v2:10-19, v3:20-29 so on
Each physical node is assigned these vnodes as:
node1: v1, v4, v7 node2: v2, v5, v8 node3: v3, v6, v9
The default number of Vnodes owned by a node in Cassandra is 256, which is set by num_tokens property. When a node is added into a cluster, the token allocation algorithm allocates tokens to the node. The algorithm selects random token values to ensure uniform distribution. But, the num_tokens property can be changed to achieve uniform data distribution. The number of 256 Vnodes per physical node is calculated to achieve uniform data distribution for clusters of any size and with any replication factor. In some large clusters, the 256 Vnode do not perform well please refer to blog cassandra-vnodes-how-many-should-i-use for more information.
Learn more in our detailed guide to Apache Cassandra on AWS
Replication
The data in each keyspace is replicated with a replication factor. The most common replication factor used is three. There is one primary replica of data that resides with the token owner node as explained in the data partitioning section. The remainder of replicas is placed by Cassandra on specific nodes using the replica placement strategy. All replicas are equally important for all database operations except for a few cluster mutation operations.
There are two settings that mainly impact replica placement. First is snitch, which determines the data center, and the rack a Cassandra node belongs to, and it is set at the node level. They inform Cassandra about the network topology so that requests are routed efficiently and allow Cassandra to distribute replicas by grouping machines into data centers and racks. GossipingPropertyFileSnitch is the goto snitch for any cluster deployment. It uses a configuration file called Cassandra-rackdc.properties on each node. It contains the rack and data center name which hosts the node. There is cloud-specific snitch available for AWS and GCP.
The second setting is the replication strategy. The replication strategy is set at the keyspace level. There are two strategies: SimpleStrategy and NetworkTopologyStrategy. The SimpleStrategy does not consider racks and multiple data centers. It places data replicas on nodes sequentially. The NetworkTopologyStrategy is rack aware and data center aware. SimpleStrategy should be only used for temporary and small cluster deployments, for all other clusters NetworkTopologyStrategy is highly recommended. A keyspace definition when used with NetworkTopologyStrategy specifies the number of replicas per data center as:
1 |
cqlsh> create keyspace ks with replication = {'class' : 'NetworkTopologyStrategy', dc_1: 3, dc_2: 1} |
Here, the keyspace named ks is replicated in dc_1 with factor three and in dc_2 with factor one.
Consistency and Availability
Each distributed system works on the principle of the CAP theorem. The CAP theorem states that any distributed system can strongly deliver any two out of the three properties: Consistency, Availability and Partition-tolerance. Cassandra provides flexibility for choosing between consistency and availability while querying data. In other words, data can be highly available with a low consistency guarantee, or it can be highly consistent with lower availability. For example, if there are three data replicas, a query reading or writing data can ask for acknowledgments from one, two, or all three replicas to mark the completion of the request. For a read request, Cassandra requests the data from the required number of replicas and compares their write-timestamp. The replica with the latest write-timestamp is considered to be the correct version of the data. Hence, the more replicas involved in a read operation adds to the data consistency guarantee. For write requests, the requested number is considered for replicas acknowledging the write.
Naturally, the time required to get the acknowledgement from replicas is directly proportional to the number of replicas requests for acknowledgement. Hence, consistency and availability are exchangeable. The concept of requesting a certain number of acknowledgements is called tunable consistency and it can be applied at the individual query level.
There are a few considerations related to data availability and consistency:
- The replication factor should ideally be an odd number. The common replication factor used is three, which provides a balance between replication overhead, data distribution, and consistency for most workloads.
- The number of racks in a data center should be in multiples of the replication factor. The common number used for nodes is in multiples of three.
- There are various terms used to refer to the consistency levels –
- One, two, three: Specified number of replicas must acknowledge the operation.
- Quorum: The strict majority of nodes is called a quorum. The majority is one more than half of the nodes. This consistency level ensures that most of the replicas confirm the operation without having to wait for all replicas. It balances the operation efficiency and good consistency. e.g.Quorum for a replication factor of three is (3/2)+1=2; For replication factor five it is (5/2)+1=3.
- Local_*: This is a consistency level for a local data center in a multi-data center cluster. A local data center is where the client is connected to a coordinator node. The * takes a value of any specific number specified above or quorum, e.g. local_three, local_quorum.
- Each_*: This level is also related to multi data center setup. It denotes the consistency to be achieved in each of the data centers independently, e.g. each_quorum means quorum consistency in each data center.
The data written and read at a low consistency level does not mean it misses the advantage of replication. The data is kept consistent across all replicas by Cassandra, but it happens in the background. This concept is referred to as eventual consistency. In the three replica example, if a user queries data at consistency level one, the query will be acknowledged when the read/write happens for a single replica. In the case of a read operation, this could mean relying on a single data replica as a source of truth for the data. In case of a write operation, the remainder replicas receive the data later on and are made consistent eventually. In case of failure of replication, the replicas might not get the data. Cassandra handles replication shortcomings with a mechanism called anti-entropy which is covered later in the post.
Query Interface
Cassandra Query Language CQL is the interface to query Cassandra with a binary protocol. Earlier versions of Cassandra supported thrift which is now entirely replaced by CQL. CQL is designed to be similar to SQL for a quicker learning curve and familiar syntax. The DDL operations allow to create keyspace and tables, the CRUD operations are select, insert, update, and delete where select is a Cassandra read operation, and all others are Cassandra write operations.
A table definition includes column definitions and primary, partition, and clustering keys. The table definition also contains several settings for data storage and maintenance. The primary key is a combination of the partition key and clustering columns. The clustering columns are optional. The partition key can be a single column or a composite key.
The query set available in CQL is quite limited as compared to SQL. A few highlights:
- Cassandra does not support join operations and nested queries.
- Each select query should specify a complete partition key. It is possible to query multiple partitions, but not recommended. Refer Cassandra scalability
- Cassandra supports a limited set of data aggregation operations.
- The order by clause can be used only for columns in the clustering key. Also, those should be used in the correct order of precedence.
The reason for a limited query set in Cassandra comes from specific data modelling requirements. The data model for a Cassandra database should be aimed to create denormalized tables which can cater to the select query patterns. Cassandra data modelling is one of the essential operations while designing the database. All the features provided by Cassandra architecture like scalability and reliability are directly subject to an optimum data model.
Download our 6 step guide to Apache Cassandra data modelling for an in depth look at data modelling.
The Cassandra driver program provides a toolset for connection management, pooling, and querying. The driver creates a connection with a Cassandra node which is then referred to as the coordinator node for the query. The coordinator is responsible for query execution and to aggregate partial results.
The Datastax Java Driver is the most popular, efficient and feature rich driver available for Cassandra. There are several other technology drivers which provide similar functionality.
Data Storage
Cassandra uses a commit log for each incoming write request on a node. Commit log is a write-ahead log, and it can be replayed in case of failure. The on-disk data structure is called SSTable. SSTables are created per table in the database.
Example:
Consider a sample keyspace and table created as follows.
1 2 3 4 5 6 7 8 9 10 |
cqlsh> create keyspace ks with replication = {'class' : 'NetworkTopologyStrategy','datacenter_1' : 3}; cqlsh> CREATE TABLE ks.tb ( id int PRIMARY KEY, col1 text); And insert some data: cqlsh> insert into ks.tb (id, col1) values (1, 'first_row'); cqlsh> insert into ks.tb (id, col1) values (2, 'second_row'); cqlsh> insert into ks.tb (id, col1) values (3, 'third_row'); |
The data we inserted looks as given below in an SSTable.
Note that this representation is obtained by a utility to generate human-readable data from SSTables. The actual data in SSTables is in binary format and compressed for efficiency.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
[ { "partition" : { "key" : [ "1" ], "position" : 0 }, "rows" : [ { "type" : "row", "position" : 33, "liveness_info" : { "tstamp" : "2020-04-14T13:22:07.756013Z" }, "cells" : [ { "name" : "col1", "value" : "first_row" } ] } ] }, { "partition" : { "key" : [ "2" ], "position" : 34 }, "rows" : [ { "type" : "row", "position" : 71, "liveness_info" : { "tstamp" : "2020-04-14T13:22:29.923397Z" }, "cells" : [ { "name" : "col1", "value" : "second_row" } ] } ] }, { "partition" : { "key" : [ "3" ], "position" : 72 }, "rows" : [ { "type" : "row", "position" : 108, "liveness_info" : { "tstamp" : "2020-04-14T13:22:39.282459Z" }, "cells" : [ { "name" : "col1", "value" : "third_row" } ] } ] } ] |
Cassandra maintains immutability for data storage to provide optimal performance. Hence, SSTables are immutable. The updates and deletes to data are handled with a new version of data. This strategy results in multiple versions of data at any given time. Cassandra is designed to be optimistic for write operations as compared to the read operations. The read operation consolidates all versions of the data and returns the most recent version. Each data cell is written with a write-timestamp which specifies the time when the particular data was written. This timestamp is used to find the latest version of data while retrieving data for a read operation.
In the above example, we update data for a column of id 1 and see the result:
1 |
cqlsh> update ks.tb set col1='updated_row_one' where id=1; |
The resulting data in the SSTable for this update looks like:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
[ { "partition" : { "key" : [ "1" ], "position" : 0 }, "rows" : [ { "type" : "row", "position" : 39, "cells" : [ { "name" : "col1", "value" : "updated_row_one", "tstamp" : "2020-04-14T13:38:37.794697Z" } ] } ] } ] |
The data looks precisely the same as the newly inserted data. Cassandra identifies this and considers the updated value as it has a greater timestamp value.
The deletes are handled uniquely in Cassandra to make those compatible with immutable data. Each delete is recorded as a new record which marks the deletion of the referenced data. This special data record is called a tombstone. Cassandra read operation discards all the information for a row or cell if a tombstone exists, as it denotes deletion of the data. There are various types of tombstones to denote data deletion for each element, e.g. cell, row, partition, range of rows etc. Cassandra allows setting a Time To Live TTL on a data row to expire after a specified amount of time after insertion. The data once past its TTL is regarded as a tombstone in Cassandra. Refer managing-tombstones-in-Cassandra for operational information and efficiency about tombstones.
Now with the SSTable example, a cell delete looks like:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
cqlsh> delete col1 from ks.tb where id=1; [ { "partition" : { "key" : [ "1" ], "position" : 0 }, "rows" : [ { "type" : "row", "position" : 24, "cells" : [ { "name" : "col1", "deletion_info" : { "local_delete_time" : "2020-04-14T13:44:27Z" }, "tstamp" : "2020-04-14T13:44:27.179254Z" } ] } ] } ] |
The deletion_info indicates that the cell is deleted. This data is the tombstone for the original data and all the data versions.
Cassandra performs compaction operation on SSTables which consolidates two or more SSTables to form a new SSTable. This process combines all versions of data in participating SSTables. The compaction outputs a single version of data among all obtained versions in the resulting SSTable. Compactions also purge the data associated with a tombstone if all the required conditions for purging are met. There are various strategies to trigger and perform compaction. Refer apache-Cassandra-compactions
- SizeTieredCompactionStrategy (STCS): This is the default compaction strategy. It is triggered using the size of SSTables on-disk.
- LevelledCompactionStrategy (LCS): This strategy is used to optimize read performance. This strategy considers the data partitions present in SSTables, and arranges SSTables in levels. Each level has a fixed set of tables and those are compacted with each other.
- TimeWindowCompactionStrategy (TWCS): This is a specialized strategy for time series data. It arranges SSTables in time window buckets defined in the table definition. The SSTables within a time window are only compacted with each other.
The other crucial set of operations performed in Cassandra is anti-entropy. The aim of these operations is to keep data as consistent as possible. The anti-entropy enables Cassandra to provide the eventual consistency model.
- Hinted Handoff: If a node in Cassandra is not available for a short period, the data which is supposed to be replicated on the node is stored on a peer node. This data is called hints. Once the original node becomes available, the hints are transferred to the node, and the node is caught up with missed data. There are time and storage restrictions for hints. If a node is not available for a longer duration than configured, no hints are saved for it. Hints cannot be considered as a primary anti-entropy mechanism.
- Read Repair: Read operation is used as an opportunity to repair inconsistent data across replicas. The latest write-timestamp is used as a marker for the correct version of data. The read repair operation is performed only in a portion of the total reads to avoid performance degradation. Read repairs are opportunistic operations and not a primary operation for anti-entropy.
- Repair: Repair is the primary anti-entropy operation to make data consistent across replicas. Repairs are performed by creating specialized data structures called Merkel-trees. These are hash values of all data values in a replica. Then these are transferred to other replicas and compared to detect inconsistencies. The correct data is then streamed across nodes to repair the inconsistencies.
Repairs need to be scheduled manually as these are intensive operations that consume a significant amount of cluster resources.
Write Path
Cassandra writes path is the process followed by a Cassandra node to store data in response to a write operation. A coordinator node initiates a write path and is responsible for the request completion.
The high-level steps are as follows:
- The partitioner applies a hash to the partition key of an incoming data partition and generates a token.
- The node is identified where the partition belongs to and all the nodes where the replicas reside for the partition.
- Write request is forwarded to all replica nodes, and acknowledgement is awaited.
- As the number of nodes required to fulfil the write consistency level acknowledge the request completion, the write operation completes.
An example with a six node cluster, a replication factor of three and a written request consistency of quorum.
Quorum for RF 3 = (3/2)+1 = 2
Common error scenarios:
- If the sufficient number of nodes required to fulfil the request are not available or do not return the requested acknowledgement, the coordinator throws an exception.
- Even after satisfying the request with the required number of replica acknowledgements, if an additional node that stores a replica for the data is not available, the data could be saved as a hint on another node.
In a multi-datacenter cluster, the coordinator forwards write requests to all applicable local nodes. For the remote data centers, the write request is forwarded to a single node per data center. The node replicates data to the data center with the required number of nodes to satisfy the consistency level.
The Anatomy of a Write Operation on a Node
This operation involves commit log, memtable and SSTable. Commit log is a write-ahead log that is stored on a disk. The write operation is recorded in the commit log of a node, and the acknowledgement is returned. The data is then stored in a memtable which is in memory structure representing SSTable on-disk. The memtable is flushed to disk after reaching the memory threshold which creates a new SSTable. The SSTables are eventually compacted to consolidate the data and optimize read performance.
Read Path
Cassandra read path is the process followed by a Cassandra node to retrieve data in response to a read operation. The read path has more steps than the write path. Actions performed to serve a read request are as follows:
- The coordinator generates a hash using the partition key and gathers the replica nodes which are responsible for storing the data.
- The coordinator checks if replicas required to satisfy the read consistency level are available. If not, an exception is thrown, and the read operation ends.
- The coordinator then sends a read data request to the fastest responding replica; the fastest replica could be the coordinator itself. The fast replica is determined by a dynamic snitch, which keeps track of node latencies dynamically.
- The coordinator then sends a digest request to the replicas of data. The digest is a hash calculated over requested data by the replica nodes.
- The coordinator compares all the digests to determine whether all the replicas have a consistent version of the data. If those are equal, it returns the result obtained from the fastest replica.
If the digests from all the replicas are not equal, it means some replicas do not have the latest version of the data. In this case, read data requests for all the replicas are issued, and the data with the latest timestamp is returned to the client. Also, read repair requests are issued for the replicas which do not have the latest data version.
Components involved in a read operation on a node:
- Row cache: This is a cache for frequently read data rows, also referred to as hot data. It stores a complete data row which can be returned directly to the client if requested by a read operation. This is an optional feature and works best if there are a small number of hot rows which can fit in the row cache.
- Partition key cache: This component caches the partition index entries per table which are frequently used. In other words, it stores the location of partitions that are commonly queried but not the complete rows. This feature is used by default in Cassandra, but it can be optimized more.
- Bloom filter: A bloom filter is a data structure that indicates if a data partition could be included in a given SSTable. The positive result returned by a bloom filter can be a false signal, but the negative results are always accurate. Hence it saves a lot of seek-time for read operations.
- Partition index and summary: A partition index contains the offset of all partitions for their location in SSTable. The partition summary is a summary of the index. These components enable locating a partition exactly in an SSTable rather than scanning data.
- Memtable: Memtable is an in-memory representation of SSTables. If a data partition is present in memtable, it can be directly read for specific data rows and returned to the client.
- Compression offset map: This is the map for locating data in SSTables when it is compressed on-disk.
- SSTable: The on-disk data structure which holds all the data once flushed from memory.
Anatomy of Read Operation on a Node
- Cassandra checks the row cache for data presence. If present, the data is returned, and the request ends.
- The flow of request includes checking bloom filters. If the bloom filter indicates data presented in an SSTable, Cassandra continues to look for the required partition in the SSTable.
- The key cache is checked for the partition key presence. The cache hit provides an offset for the partition in SSTable. This offset is then used to retrieve the partition, and the request completes.
- Cassandra continues to seek the partition in the partition summary and partition index. These structures also provide the partition offset in an SSTable which is then used to retrieve the partition and return. The caches are updated if present with the latest data read.
Conclusion
Cassandra architecture is uniquely designed to provide scalability, reliability, and performance. It is based on distributed system architecture and operates on the CAP theorem. Cassandra’s unique architecture needs careful configuration and tuning. It is essential to understand the components in order to use Cassandra efficiently.
Contact us to get expert advice on managing and deploying Apache Cassandra.
Transparent, fair, and flexible pricing for your data infrastructure: See Instaclustr Pricing Here