Elasticsearch Connector
This document is based on https://docs.lenses.io/connectors/sink/elastic6.html. One use of this connector is to incorporate ElasticSearch into the existing streaming solution, where we can turn each message into ElasticSearch documents for further query.
The connectors use a specific query language called KCQL, which can be specified in a connector config for inserting into ElasticSearch. Something that we may have to keep in mind is that the connectors are used to transfer the data in its entirety between Kafka and ElasticSearch, so there is no filtering capability. This is reflected in how KCQL does not have WHERE clause to the query.
Setting up the connector is really easy, the main property that we need to set up is connect.elastic.kcql, which specify the query, and the connection properties for ElasticSearch. There will be more concrete examples when we discuss the sink connector in more detail.
For a running example, we assumed that we have the following:
- An ElasticSearch instance.
- A Kafka instance with topic orders-topic.
An example of CQL query and command to setup the Kafka topic as above:
1 |
./kafka-topics.sh --bootstrap-server <ip>:9092 --topic orders-topic --create --replication-factor 3 --partition 3 |
Sink Connector
This connector is used to write data to an ElasticSearch instance. For full descriptions of the options, consult https://docs.lenses.io/connectors/sink/elastic6.html. This is an example of the connector config that reads data from a topic named orders-topic in Kafka and pushes it into the ElasticSearch described above.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
{ "name" : "elastic-sink", "config" : { "connector.class" : "com.datamountaineer.streamreactor.connect.elastic7.ElasticSinkConnector", "tasks.max" : 3, "topics" : "orders-topic", "connect.elastic.hosts" : "<ES IP>", "connect.elastic.port" : 9201, "connect.elastic.kcql" : "INSERT INTO index_1 SELECT * FROM orders-topic", "connect.elastic.use.http.username" : "<ES User>", "connect.elastic.use.http.password" : "<ES Password>" } } |
To test this, create a kafka producer to produce some values in the form of JSON string. This is an example:
1 2 3 4 |
./kafka-console-producer.sh --broker-list <ip>:9092 --topic orders-topic >{"id":1,"random_field" : "foo"} >{"id":2,"random_field" : "bar"} >{"id":3,"random_field" : "baz"} |
And check that the values produced are put into ElasticSearch, e.g.,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
curl -u <elasticsearch user:password> "https://<elasticsearch IP>:9201/_search" | jq { "took": 0, "timed_out": false, "_shards": { "total": 4, "successful": 4, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 582, "relation": "eq" }, "max_score": 1, "hits": [ { "_index": "index_1", "_type": "index_1", "_id": "orders-topic-0-0", "_score": 1, "_source": { "id": 1, "random_field": "foo" } }, { "_index": "index_1", "_type": "index_1", "_id": "orders-topic-0-1", "_score": 1, "_source": { "id": 2, "random_field": "bar" } }, { "_index": "index_1", "_type": "index_1", "_id": "orders-topic-0-2", "_score": 1, "_source": { "id": 3, "random_field": "baz" } }, … ] } } |
Note that there might be differences in the exact values.
Using Other Converters
We can also use the AvroConverter if we are using Schema Registry. Add the following lines to the configuration for the connector:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
{ "name" : "elastic-sink-SchemaRegistry", "config" : { "connector.class" : "com.datamountaineer.streamreactor.connect.elastic7.ElasticSinkConnector", "tasks.max" : 3, "topics" : "orders-topic", "connect.elastic.hosts" : "<ES IP>", "connect.elastic.port" : 9201, "connect.elastic.kcql" : "INSERT INTO index_6 SELECT * FROM orders-topic", "connect.elastic.use.http.username" : "<ES User>", "connect.elastic.use.http.password" : "<ES Password>", "value.converter" : "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url" : "<schema registry url with creds>", "value.converter.basic.auth.credentials.source": "URL" } } |