Use Apache Kafka with Java
In this example we will be using the official Java client maintained by the Apache Kafka team. A list of alternative Java clients can be found here.
Dependencies
Add the kafka_2.12 package to your application. This package is available in maven:
1 2 3 4 5 |
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>1.1.0</version> </dependency> |
Using client ⇆ broker encryption (SSL)
If you have chosen to enable client ⇆ broker encryption, see here for information on the certificates required to establish an SSL connection to your cluster.
Producing Messages
Client Configuration
Before creating a Kafka producer client, you first need to define the configuration properties for the producer client to use. In this example we provide only the required properties for the producer client. See here for the full list of configuration options.
Create a new file named producer.properties, ensuring the password, truststore location, and bootstrap servers list are correct:
1 2 3 4 5 6 7 8 9 10 11 12 |
bootstrap.servers=18.204.134.49:9092, 18.208.108.53:9092, 34.194.230.138:9092 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 ssl.truststore.location=truststore.jks ssl.truststore.password=instaclustr ssl.protocol=TLS security.protocol=SASL_SSL sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ickafka" password="361d4871ff1a5ef58deaf3b887b4898029faee9690e62c549078a1f51f18f755"; |
Note: If your cluster does not have client ⇆ broker encryption enabled your file should look like this:
1 2 3 4 5 6 7 8 |
bootstrap.servers=18.204.134.49:9092, 18.208.108.53:9092, 34.194.230.138:9092 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ickafka" password="361d4871ff1a5ef58deaf3b887b4898029faee9690e62c549078a1f51f18f755"; |
Make sure the password and bootstrap servers list are correct.
Note: To connect to your Kafka cluster over the private network, use port 9093 instead of 9092.
Java Code
Now that we have a properties file we can create a Kafka producer.
First, load the properties:
1 2 3 4 5 6 7 |
Properties kafkaProps = new Properties(); try { kafkaProps.load(new FileReader("producer.properties")); } catch (IOException e) { e.printStackTrace(); } |
Once we’ve loaded the producer properties, we can create the producer itself:
1 |
KafkaProducer<String,String> producer = new KafkaProducer<String,String>(kafkaProps) |
Before we can send a message we first need to create a ProducerRecord object:
1 |
ProducerRecord<String, String> record = new ProducerRecord<String, String>("my-topic","my-key", "test"); |
Now that we have a ProducerRecord object, sending it is trivial:
1 2 3 4 5 6 |
try{ producer.send(record); producer.flush(); }catch (Exception e){ e.printStackTrace(); } |
Note: We use the producer’s flush method here to ensure the message gets sent before the program exits. In normal operation the producer will send messages in batches when it has either accumulated a certain number of messages, or has waited a certain amount of time.
Consuming Messages
Client Configuration
As in the producer example, before creating a Kafka consumer client, you first need to define the configuration properties for the consumer client to use. In this example we provide only the required properties for the consumer client. See here for the full list of configuration options.
Create a new file named consumer.properties, ensuring the password, truststore location, and bootstrap servers list are correct:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
bootstrap.servers=18.204.134.49:9092, 18.208.108.53:9092, 34.194.230.138:9092 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 ssl.protocol=TLS ssl.truststore.location=truststore.jks ssl.truststore.password=instaclustr group.id=my-group security.protocol=SASL_SSL sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ickafka" password="361d4871ff1a5ef58deaf3b887b4898029faee9690e62c549078a1f51f18f755"; |
Note: If your cluster does not have client ⇆ broker encryption enabled your file should look like this:
1 2 3 4 5 6 7 8 9 |
bootstrap.servers=18.204.134.49:9092, 18.208.108.53:9092, 34.194.230.138:9092 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer group.id=my-group security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ickafka" password="361d4871ff1a5ef58deaf3b887b4898029faee9690e62c549078a1f51f18f755"; |
Make sure the password and bootstrap servers list are correct.
Note: To connect to your Kafka cluster over the private network, use port 9093 instead of 9092.
Java Code
Now that we have a properties file we can create a Kafka producer.
First, load the properties:
1 2 3 4 5 6 7 |
Properties kafkaProps = new Properties(); try { kafkaProps.load(new FileReader("consumer.properties")); } catch (IOException e) { e.printStackTrace(); } |
Once we’ve setup the consumer properties, we can create the consumer itself:
1 |
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(kafkaProps); |
Before we can consume messages, we need to subscribe to the topics we wish to receive messages from:
1 |
consumer.subscribe(Collections.singleton("my-topic")); |
Now we are ready to consume messages from Kafka. To consume a single batch of messages, we use the consumer’s poll method:
1 |
ConsumerRecords<String,String> records = consumer.poll(100); |
Combined with a loop, we can continually consume messages from Kafka as they are produced:
1 2 3 4 5 6 7 8 9 10 11 |
try{ while (true){ records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format("topic = %s, partition = %s, offset = %d, customer = %s, country = %s", record.topic(), record.partition(), record.offset(), record.key(), record.value())); } } } finally { consumer.close(); } |
Putting Them Together
Now that we have a consumer and producer set up, it’s time to combine them.
Start the consumer
Start the consumer before starting the producer because by default consumers only consume messages that were produced after the consumer started.
Start the producer
Now that the consumer is setup and ready to consume messages, you can now start your producer.
If the consumer and producer are setup correctly the consumer should output the message sent by the producer shortly after it was produced:
1 |
topic = my-topic, partition = 1, offset = 0, key = my-key, value = test |