Apache Cassandra Connector
The Bundled Cassandra Connector included in Kafka Connect is an open source connector developed by lenses.io with an Apache 2.0 license. The documentation below describes how to configure it on your Instaclustr Managed Kafka Connect cluster. More details on the Connector can be found on https://docs.lenses.io/connectors/sink/cassandra.html and https://docs.lenses.io/connectors/source/cassandra.html.
The Connector uses JSON as the data format for the messages read from of Kafka and uses JSON insert functionality to insert the rows into Cassandra. Similar conversion occurs in the other direction too. However, Cassandra Source Connector is not covered under the Instaclustr Support policy as the approach used by this Connector to read data from Cassandra is not suitable for use in a production deployment. If used, it could lead to significant performance issue when the Read query eventually breaks when it spans across multiple partitions.
An example use case for this Connector is when taking a stream of data from an IoT and moving it to long term storage for later analysis. In this scenario, we can use the sink connector to grab the data from IOT in the Kafka cluster and push it to the Cassandra database.
The connectors use a specific query language called KCQL, which can be specified in a connector config for inserting into Cassandra (sink) or selecting rows from Cassandra (source). Something that we may have to keep in mind is that the connectors are used to transfer the data in its entirety between Cassandra and Kafka, so there is no filtering capability. This is reflected in how KCQL does not have WHERE clause to the query.
Setting up the connector is really easy, the main property that we need to set up is connect.cassandra.kcql, which specify the query, and the connection properties for Cassandra. There will be more concrete examples when we discuss the source and sink in more detail.
For a running example, we assumed that we have the following:
- A keyspace demo in the Cassandra database.
- A column family orders in the Cassandra database with a column created defined as TIMEUUID type.
- A Kafka instance with topic orders-topic.
An example of CQL queries and command to setup the Kafka topic as above:
1 2 3 4 5 |
CREATE KEYSPACE demo WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 3}; use demo; create table orders (id int, created timeuuid, product varchar, qty int, price float, PRIMARY KEY (id, created)) WITH CLUSTERING ORDER BY (created asc); ./kafka-topics.sh --bootstrap-server <bootstrap servers> --command-config kafka.properties --topic orders-topic --create --replication-factor 3 --partitions 3 |
Make sure that you have can access your Kafka, Kafka Connect, and Cassandra cluster. Additionally, you also need to ensure that the Kafka Connect cluster can communicate with the Cassandra cluster. Pay attention to the firewall rule.
Sink Connector
This connector is used to write to a Cassandra database. For full descriptions of the options, consult https://docs.lenses.io/connectors/sink/cassandra.html. This is an example of the connector config that reads data from a topic named orders-topic in Kafka and pushes it into the Cassandra database described above.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
{ "name":"cassandra-sink", "config":{ "connector.class":"com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector", "tasks.max":"3", "topics":"orders-topic", "connect.cassandra.error.policy":"NOOP", "connect.cassandra.kcql":"INSERT INTO orders SELECT * FROM orders-topic", "connect.cassandra.port":"9042", "connect.cassandra.key.space":"demo", "connect.cassandra.contact.points":"<Cassandra comma delimited IPs>", "connect.cassandra.username":"<Cassandra user>", "connect.cassandra.password":"<Cassandra password>" } } |
To test this, create a kafka producer to produce some values in the form of JSON string. This is an example:
1 2 3 4 |
./kafka-console-producer.sh --broker-list <brokers> --producer.config kafka.properties --topic orders-topic >{"id":1,"created":"2debd3a0-b13e-11ea-aaee-c7c88277b9ea","price":94.2,"product":"OP-DAX-P-20150201-95.7","qty":100} >{"id":2,"created":"35bd5270-b13e-11ea-aaee-c7c88277b9ea","price":99.5,"product":"OP-DAX-C-20150201-100","qty":100} >{"id":3,"created":"3be6eda0-b13e-11ea-aaee-c7c88277b9ea","price":150.0,"product":"FU-KOSPI-C-20150201-100","qty":200} |
And check that the values produced are put into Cassandra
1 2 3 4 5 6 7 |
select * from demo.orders; id | created | price | product | qty ----+--------------------------------------+-------+-------------------------+----- 1 | 2debd3a0-b13e-11ea-aaee-c7c88277b9ea | 94.2 | OP-DAX-P-20150201-95.7 | 100 2 | 35bd5270-b13e-11ea-aaee-c7c88277b9ea | 99.5 | OP-DAX-C-20150201-100 | 100 3 | 3be6eda0-b13e-11ea-aaee-c7c88277b9ea | 150 | FU-KOSPI-C-20150201-100 | 200 |
Using Other Converters
By default, our Kafka Connect instance property uses org.apache.kafka.connect.json.StringConverter. We can set the connector to use JSONConverter instead by adding the bolded lines to the configuration for the connector:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
{ "name": "cassandra-source", "config": { "connector.class": "com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector", "tasks.max": "3", "connect.cassandra.error.policy":"NOOP", "connect.cassandra.kcql" : "INSERT INTO orders-topic SELECT * FROM orders PK created INCREMENTALMODE=TIMEUUID", "connect.cassandra.port" : "9042", "connect.cassandra.key.space" : "demo", "connect.cassandra.contact.points" : "<Cassandra comma delimited IPs>", <strong>"key.converter" : "org.apache.kafka.connect.json.JsonConverter", "value.converter" : "org.apache.kafka.connect.json.JsonConverter",</strong> "connect.cassandra.username" : "<Cassandra user>", "connect.cassandra.password" : "<Cassandra password>" } } |
And it will yield the following output:
1 2 3 |
{"schema":{"type":"struct","fields":[{"type":"int32","optional":true,"field":"id"},{"type":"string","optional":true,"field":"created"},{"type":"float","optional":true,"field":"price"},{"type":"string","optional":true,"field":"product"},{"type":"int32","optional":true,"field":"qty"}],"optional":false,"name":"demo.orders"},"payload":{"id":1,"created":"fc7a34d0-b1d1-11ea-aaee-c7c88277b9ea","price":94.2,"product":"OP-DAX-P-20150201-95.7","qty":100}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":true,"field":"id"},{"type":"string","optional":true,"field":"created"},{"type":"float","optional":true,"field":"price"},{"type":"string","optional":true,"field":"product"},{"type":"int32","optional":true,"field":"qty"}],"optional":false,"name":"demo.orders"},"payload":{"id":2,"created":"018344d0-b1d2-11ea-aaee-c7c88277b9ea","price":99.5,"product":"OP-DAX-C-20150201-100","qty":100}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":true,"field":"id"},{"type":"string","optional":true,"field":"created"},{"type":"float","optional":true,"field":"price"},{"type":"string","optional":true,"field":"product"},{"type":"int32","optional":true,"field":"qty"}],"optional":false,"name":"demo.orders"},"payload":{"id":3,"created":"04c131c0-b1d2-11ea-aaee-c7c88277b9ea","price":150.0,"product":"FU-KOSPI-C-20150201-100","qty":200}} |
We can also use the AvroConverter if we are using Schema Registry. Add the following lines to the configuration for the connector:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
{ "name": "cassandra-source", "config": { "connector.class": "com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector", "tasks.max": "3", "connect.cassandra.kcql" : "INSERT INTO orders-topic SELECT * FROM orders PK created INCREMENTALMODE=TIMEUUID", "connect.cassandra.port" : "9042", "connect.cassandra.key.space" : "demo", "connect.cassandra.contact.points" : "<Cassandra comma delimited IPs>", <strong> "Key.converter" : "io.confluent.connect.avro.AvroConverter" "Key.converter.schema.registry.url" : "https://<'Schema Registry User:Password'@'Schema Registry Host:Port'>" "Key.converter.basic.auth.credentials.source" : "URL" "Value.converter" : "io.confluent.connect.avro.AvroConverter" "Value.converter.schema.registry.url" : "https://<'Schema Registry User:Password'@'Schema Registry Host:Port'>" "Value.converter.basic.auth.credentials.source" : "URL"</strong> "connect.cassandra.username" : "<Cassandra user>", "connect.cassandra.password" : "<Cassandra password>" } } |
And it will yield the following output:
1 2 3 |
{"id":{"int":1},"created":{"string":"Thu May 05 13:24:22 CEST 2016"},"price":{"float":94.2},"product":{"string":"DAX-P-20150201-95.7"},"qty":{"int":100}} {"id":{"int":2},"created":{"string":"Thu May 05 13:26:21 CEST 2016"},"price":{"float":99.5},"product":{"string":"OP-DAX-C-20150201-100"},"qty":{"int":100}} {"id":{"int":3},"created":{"string":"Thu May 05 13:26:44 CEST 2016"},"price":{"float":150.0},"product":{"string":"FU-KOSPI-C-20150201-100"},"qty":{"int":200}} |