Working with Apache Kafka Streams API
In this example we will be using the Java Kafka Streams API to count the number of times different words occur in a topic.
Dependencies
Add the kafka_2.12 package to your application. This package is available in maven:
1 2 3 4 5 |
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>1.1.0</version> </dependency> |
Kafka Streams Configuration
Before we can use the Streams API we need to configure a number of things. Basic configuration requires the following configuration options. Make a file streams.properties with the following content, making sure to replace the bootstrap.servers list with the IP addresses of your cluster:
1 2 3 4 5 6 7 8 9 10 11 12 |
# Kafka broker IP addresses to connect to bootstrap.servers=54.236.208.78:9092,54.88.137.23:9092,34.233.86.118:9092 # Name of our Streams application application.id=wordcount # Values and Keys will be Strings default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde # Commit at least every second instead of default 30 seconds commit.interval.ms=1000 |
Note: To connect to your Kafka cluster over the private network, use port 9093 instead of 9092.
In order to use the Streams API with Instaclustr Kafka we also need to provide authentication credentials. If your cluster has client ⇆ broker encryption enabled you will also need to provide encryption information. For more information on using certificates with Kafka and where to find them see here. Add the following to your streams.properties file, ensuring the password and truststore location are correct:
1 2 3 4 5 6 7 8 9 |
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 ssl.truststore.location = truststore.jks ssl.truststore.password = instaclustr ssl.protocol=TLS security.protocol=SASL_SSL sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ickafka" password="64500f38930ddcabf1ca5b99930f9e25461e57ddcc422611cb54883b7b997edf"; |
If your cluster does not have client ⇆ broker encryption enabled instead add the following to your streams.properties file, ensuring the password is correct.
1 2 3 4 5 |
security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ickafka" password="361d4871ff1a5ef58deaf3b887b4898029faee9690e62c549078a1f51f18f755"; |
Create the Streams application
First, load the properties we defined earlier:
1 2 3 4 5 6 7 |
Properties props = new Properties(); try { props.load(new FileReader("streams.properties")); } catch (IOException e) { e.printStackTrace(); } |
Create a new input KStream object on the wordcount-input topic:
1 2 |
StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream("wordcount-input"); |
Create the word count KStream that will calculate the number of times each word occurs:
1 2 3 4 5 6 7 |
final Pattern pattern = Pattern.compile("\W+"); KStream counts = source.flatMapValues(value-> Arrays.asList(pattern.split(value.toLowerCase()))) .map((key, value) -> new KeyValue<Object, Object>(value, value)) .groupByKey() .count(Materialized.as("CountStore")) .mapValues(value->Long.toString(value)).toStream(); |
Direct the output from the word count KStream to a topic named wordcount-output:
1 |
counts.to("wordcount-output"); |
Finally, create and start the KafkaStreams object:
1 2 |
KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); |
Our Streams application is now ready, but we need to some more setup before we can use it.
Create Input Topic
In this example we are going to use the Streams API to count the occurances of words in a Kafka topic. Before we can run the Streams application we need to create the topic to read input from. Use the guide here to create a new topic called wordcount-input with 1 partition and a replication factor of 1.
Produce Some Input
First, follow the guide here to setup a Kafka console producer, changing the topic name to the wordcount-input topic.
Once you’ve setup a console producer, send some input to Kafka (preferably with duplicated words so we get word counts greater than 1):
1 2 |
> How much wood could a woodchuck chuck if a woodchuck could chuck wood? As much wood as a woodchuck could chuck, if a woodchuck could chuck wood. > |
Start the Streams application
Run the Java Streams application we created earlier. If setup correctly the application won’t produce any output, and will continue running indefinitely until you stop it.
Consume the Output
Now that the Streams application has begun calculating the word counts of our input text, we would like to view the output. To view the output follow the guide here to setup a Kafka console consumer, changing the topic name to the wordcount-output topic.
After some delay (it could take up to a minute for the first output to come through), the consumer output the word counts as produced by our Streams application:
1 2 3 4 5 6 7 8 9 |
how 1 much 2 as 2 if 2 a 4 woodchuck 4 could 4 chuck 4 wood 4 |
Feel free to produce some more text to the wordcount-input topic, and after some delay the consumer should output new word count list. For example, if you produced the input “chuck the woodchuck chucked wood”, the console consumer should output:
1 2 3 4 5 |
chuck 5 the 1 woodchuck 5 chucked 1 wood 5 |
Note: the Streams application will only output the counts for words contained in the most recent message.