Use Apache Kafka with C#
There are many Kafka clients for C#, a list of some recommended options to use Kafka with C# can be found here.
Create Cluster
First, you will need a Kafka cluster. If you don’t have one already, just head over to the Instaclustr console and create a free Kafka cluster to test this with.
Full step by step instructions to deploy a cluster can be found in this document.
Once you have your cluster running, ensure you add your IP to the firewall, and you are ready to proceed.
Transparent, fair, and flexible pricing for your data infrastructure: See Instaclustr Pricing Here
Package Dependencies
Add your Kafka package to your application. For example, you can install Confluent.Kafka from within Visual Studio by searching for Confluent.Kafka in the NuGet UI, or by running this command in the Package Manager Console:
1 |
Install-Package Confluent.Kafka -Version 0.11.4 |
Using client ⇆ broker encryption (SSL)
If you have chosen to enable client ⇆ broker encryption on your Kafka cluster, please refer to this document for step by step instructions to establish an SSL connection to your Kafka cluster.
Producing Messages
Next, we’ll produce some messages to the kafka cluster, using a Producer Builder.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
var config = new Dictionary<string, object> { using System; using System.Threading.Tasks; using Confluent.Kafka; var config = new ProducerConfig { BootstrapServers = "<your-IP-port-pairs>", SslCaLocation = "/Path-to/cluster-ca-certificate.pem", SecurityProtocol = SecurityProtocol.SaslSsl, SaslMechanism = SaslMechanism.ScramSha256, SaslUsername = "ickafka", SaslPassword = "yourpassword", }; |
Ensure the IP addresses, cluster certificate location and password are correct. If your Kafka cluster does not have client ⇆ broker encryption enabled your configuration options should look like this:
1 2 3 4 5 |
BootstrapServers = "<your-IP-port-pairs>", SecurityProtocol = SecurityProtocol.SaslPlaintext, SaslMechanism = SaslMechanism.ScramSha256, SaslUsername = "ickafka", SaslPassword = "yourpassword", |
Make sure the IP addresses and passwords are correct.
Note: To connect to your Kafka cluster over the private network, use port 9093 instead of 9092.
Now that we have setup the configuration Dictionary, we can create a Producer object:
1 |
using (var p = new ProducerBuilder<Null, string>(config).Build()) |
Once we have a Producer object we can use it to send a message to Kafka:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
using (var p = new ProducerBuilder<Null, string>(config).Build()) { try { var dr = await p.ProduceAsync("test-topic", new Message<Null, string> { Value="test" }); Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'"); } catch (ProduceException<Null, string> e) { Console.WriteLine($"Delivery failed: {e.Error.Reason}"); } } } } |
Note: we move the producer declaration into a using block here so it will be automatically closed cleanly once we’re finished with it.
Consuming Messages
Messages from Kafka are consumed using a Consumer object.
First, create the configuration Dictionary:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
using System; using System.Threading.Tasks; using Confluent.Kafka; { var config = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "<your-IP-port-pairs>", SslCaLocation = "/PathTO/cluster-ca-certificate.pem", SecurityProtocol = SecurityProtocol.SaslSsl, SaslMechanism = SaslMechanism.ScramSha256, SaslUsername = "ickafka", SaslPassword = "yourpassword", AutoOffsetReset = AutoOffsetReset.Latest, } } |
Ensure the IP addresses, cluster certificate location and password are correct. If your Kafka cluster does not have client ⇆ broker encryption enabled your configuration options should look like this:
1 2 3 4 5 6 |
BootstrapServers = "<your-IP-port-pairs>", SecurityProtocol = SecurityProtocol.SaslPlaintext, SaslMechanism = SaslMechanism.ScramSha256, SaslUsername = "ickafka", SaslPassword = "yourpassword", AutoOffsetReset = AutoOffsetReset.Latest, |
Make sure the IP addresses and password are correct.
Note: To connect to your Kafka cluster over the private network, use port 9093 instead of 9092.
Now that we have the configuration Dictionary we can create a Consumer object:
1 |
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build()) |
Before we can consume messages, we need to subscribe to the topics we wish to receive messages from:
1 |
c.Subscribe("my-topic"); |
Combined with a loop, we can continually consume and output messages from Kafka as they are produced:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
using (var c = new ConsumerBuilder<Ignore, string>(config).Build()) { c.Subscribe("my-topic"); CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; // prevent the process from terminating. cts.Cancel(); }; try { while (true) { try { var cr = c.Consume(cts.Token); Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); } catch (ConsumeException e) { Console.WriteLine($"Error occurred: {e.Error.Reason}"); } } } catch (OperationCanceledException) { // Ensure the consumer leaves the group cleanly and final offsets are committed. c.Close(); } |
Note: we move the consumer declaration into a using block here so it will be automatically closed cleanly once we’re finished with it.
Putting Them Together
Now that we have a consumer and producer setup, it’s time to combine them.
Start the consumer
Start the consumer before starting the producer because by default consumers only consume messages that were produced after the consumer started
Start the producer
Now that the consumer is setup and ready to consume messages, you can now start your producer.
If the consumer and producer are setup correctly the consumer should output the message sent by the producer shortly after it was produced.
1 |
Consumed message 'test' at: 'my-topic [[0]] @0'. |