• Apache Kafka
  • Technical
Apache Kafka® “Kongo” 5.3: Kongo Streams Example

Introduction

In the previous blog we tried a simple Kafka Streams application for Cluedo. It relied on a KTable to count the number of people in each room. In this blog, we’ll extend this idea and develop a more complex streams application to keep track of the weight of goods in trucks for our Kongo IoT application. Here’s the latest complete Kongo code with the new streams application (OverloadStreams.java).

1. Overload!

(Source: Illustration 51591027 © Mariayunira | Dreamstime.com)

Noah has a transportation problem. He has lots of animals of various weights, and only a small boat to transport them from the shore to the Ark. The boat can only transport 8000kg (8.8 US tons) at a time and is initially empty (we’ll ignore Noah’s weight).

Animal Weight (kg)
elephant 8000
rhino 5000
hippo 4000
giraffe 2000
bull 1000
bear 1000
croc 1000
lion 200

We’ll use this transport scenario to demonstrate the new Kongo truck overload streams application. In terms of the development and test process, it makes sense to write and test a standalone self-contained Kafka Streams application before eventual integration with other Kafka and non-Kafka components. Here’s a sample of the trace messages you can easily get with Kafka streams print() statements.

Animal weight and boat maximum payload events populate KTables:

[KTABLE-SOURCE-0000000006]: elephant, (8000<-null)

[KTABLE-SOURCE-0000000006]: rhino, (5000<-null)

[KTABLE-SOURCE-0000000006]: hippo, (4000<-null)

[KTABLE-SOURCE-0000000006]: giraffe, (2000<-null)

[KTABLE-SOURCE-0000000006]: bull, (1000<-null)

[KTABLE-SOURCE-0000000006]: bear, (1000<-null)

[KTABLE-SOURCE-0000000006]: croc, (1000<-null)

[KTABLE-SOURCE-0000000006]: lion, (200<-null)

Boat maximum payload:

[KTABLE-SOURCE-0000000014]: boat, (8000<-null)

As animals are loaded and unloaded the total weight of animals on the boat is tracked and overload warnings produced:

Noah enthusiastically loads both the elephant and the rhino onto the boat:

[KSTREAM-SOURCE-0000000000]: elephant, boat

[KSTREAM-LEFTJOIN-0000000020]: boat, 8000

The elephant weighs 8000kg so the boat has an 8000kg load:

[KTABLE-SOURCE-0000000010]: boat, (8000<-null)

[KSTREAM-SOURCE-0000000000]: rhino, boat

The rhino weighs 5000kg plus the 8000kg elephant results in a 13000kg load:

[KSTREAM-LEFTJOIN-0000000020]: boat, 13000

[KTABLE-SOURCE-0000000010]: boat, (13000<-null)

This is too heavy!

[KSTREAM-FILTER-0000000023]: boat, overloaded! 13000 > 8000

Noah unloads the rhino, the elephant remains on the boat:

[KSTREAM-SOURCE-0000000001]: rhino, boat

[KSTREAM-LEFTJOIN-0000000030]: boat, 8000

[KTABLE-SOURCE-0000000010]: boat, (8000<-null)

The boat is no longer overloaded, so Noah can transport the elephant over to the Ark, then make further trips with the giraffe, bull and rhino, and then the hippo, bear, croc, and lion.

For testing, I used the Kafka console producer to generate the input records but discovered that the default String Serializer can’t be overridden with a Long Serializer, so a workaround is to use String types for values (with conversion to Longs in the streams code).

2. Kongo Truck Overload Streams Application

(Source: Shutterstock)

The goal for this blog was to build a Kafka streams application that I could easily integrate with the existing Kongo code, and which would add some relevant functionality. Inspired by the Cluedo example, I picked truck overloading to implement.  In order to keep track of the weight of goods on each truck and produce a warning message if the weight goes over the limit, we need to know (1) the weight of each good, (2) the maximum allowed payload for each truck, (3) the current weight of goods on each truck, and (4) when goods are loaded or unloaded from/to each truck.

We use KTables to keep track of 1-3, and a combination of DSL operations including join and leftjoin (for Ktable lookups to find the weight of incoming goods, current weight of goods on a truck, and maximum payload for a truck), and map and filter etc. to juggle and change keys/values (which is the tricky bit in practice).

The existing combined rfid event topic is used as input, but because we previously combined load and unload events on the one topic to ensure event order, for simplicity of streams processing we immediately split it into separate load and unload event streams (I’m also currently unsure if there is an elegant way of keeping track of event “types” for later content-based decisions in a streams processor). Here’s the essence of the streams code (complete code in OverloadStreams.java):

Note the use of a “dummy” java object ReturnObjectSL (with String and Long arguments) to keep track of the values from a join for use in the next DSL operator.

Because Lambda expressions are used in the DSLs, the argument variable names can be anything meaningful that you like (argument types are inferred from the context).

I managed to use the built-in SerDes for String and Long types for most of the code. However, the Kongo code has a custom class that we need to use, RFIDEvent. I wondered if we needed to write a SerDes for this? The documentation says that there are three options for creating SerDes:

  1. Write a serializer for your data type T by implementing org.apache.kafka.common.serialization.Serializer.
  2. Write a deserializer for T by implementing org.apache.kafka.common.serialization.Deserializer.
  3. Write a serde for T by implementing
    1. org.apache.kafka.common.serialization.Serde, which you either do manually (see existing SerDes in the previous section) or
    2. by leveraging helper functions in Serdes such as Serdes.serdeFrom(Serializer<T>, Deserializer<T>).

3b. meant that it was trivial to reuse the existing Kafka (De-)Serializers for RFIDEvent (RFIDEventSerializer which implemented both Serializer and Deserializer) to create a SerDes like this :

For completeness I’ve updated our previous Kafka data conversion diagram to show this creation path for Streams SerDes: SerDes can be created from existing Kafka (De-)serializers.

SerDes can be created from existing Kafka (De-)serializers.

Here’s some alternative code (replacing (A) and (B) above) using branch (split) to produce the rfidLoadStream and rfidUnLoadStream. Branch works like a case/switch statement and produces as many output streams as there are cases, as an array of streams with the same key/values as the input stream.

To integrate the new Streams code with the existing Kongo code the following minor changes were made:

  • Three new topics were created:
    • Kongo-goods-weight
      • <goods, weight>, the weight of each Goods in the system
      • output from Kongo, input to streams for KTable
    • Kongo-trucks-maxweight
      • <truck, maxWeight>, the maximum payload that a truck can transport
      • output from Kongo, input to streams for KTable
    • Kongo-trucks-weight
      • <trucks, weight>, the dynamically changing weight of Goods on each truck
      • input and output from streams
  • During the simulation world creation phase:
    • After Goods are created <goods, weight> records are sent to the new topic Kongo-goods-weight using a new producer.
    • After creating Trucks a random maximum (up to 150 tonnes) payload weight for each truck is computed and a stream of <truck, maxWeight> records are sent to the new topic kongo-trucks-maxweight using a new producer.

The new version of Kongo including the complete streams code is available here.

3. Topology Exceptions!

M.C. Escher’s impossible waterfall, maybe not so impossible in this video?!

What can go wrong with Topologies? Are some topologies impossible? Can you have a cycle in a DAG During development of the streams code I came across (1) an example of a Topology Exception, and (2) a case which perhaps should have been (but wasn’t). To help understand what’s going on (the inputs, outputs, and processing steps) for a Kafka streams application, you can print out the Topology using just the few lines of code:

final Topology top = builder.build();   System.out.println(top.describe());

The Topology for our streams application is:

Note that the sub-topologies result from key repartitioning making it slightly tricky to work out the exact correspondence to the code (i.e. some of the processors are split by repartitioning into different sub-topologies). A nicer visualization is available using this handy online Kafka Streams Topology Visualizer. Just copy the above ascii topology into the tool and you get this:

This Topology worked. However, during debugging and testing I produced a “TopologyException: Invalid topology!” error:

Exception in thread "main" org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic KSTREAM-MAP-0000000011-repartition has already been registered by another source.

After a few moments of existential panic worrying that I had been turned into a Klein bottle or some other impossible topological oddity (at least in our limited 3d world), I found that it was just a simple copy/paste error and it was trying to read from the same stream as another processor. See the red arrow:

However, I noticed that source was also shared earlier in the code (green arrows above), so I wondered why it complains here? One theory is that it’s ok to share a source that comes directly from a topic, otherwise not. Another is that it’s not ok to share a source that is the result of a repartition. Some further experiments may be in order to come up with some “Kafka streams topology rules”, and maybe even a unified theory of Kafka streams topologies.

The Kafka Streams documentation says that the topology is a DAG, so in theory it’s not supposed to have cycles (A=acyclic). But looking closely at this topology there’s something that looks suspiciously like a cycle (orange arrows):

However, there’s no topology exception and it works. The loop results from transforming the trucks weight topic to a KStream and then a KTable.  The goods weight and trucks weight is then combined to produce the new truck weight value which is sent to the trucks weight topic (where we started from).  A cycle! Why does this work? I suspect that because it’s not a “simple” loop, with the same event consumed as produced, that it’s ok, i.e. there are transformations and a state store in between the input the output. Cycles in streams are discussed in the context of other streaming technologies, and could be useful to clarify for Kafka streams as well.

4. Does It Work? Transactional Magic Pixie Dust

The new trucks overload streams application worked with the real Kongo simulation data.  However, I did notice that (as I had suspected) the truck weights sometimes went negative. I had intentionally not enforced a minimum weight of zero in the streams code to see if this happened. There are potentially four things happening here.

  1. Maybe the order of events within streams applications isn’t guaranteed. I.e. it’s possible for say multiple unload events to be processed in a row resulting in a truck weight going negative, before the load events are processed taking the weight back up to positive again.
  2. Kafka event order is only guaranteed within partitions. How does this work out in practice for streams where multiple topics (with potentially different keys and therefore partitions) are inputs, and where repartitioning happens within the streams application? Related to this is how does Topology scaling work? This is briefly covered in the book: Kafka: The Definitive Guide: Real-Time Data and Stream Processing at Scale, “Scaling the Topology” page 312ff.
  3. KTable is cached for 30s by default (the default commit_interval), this could result in stale values being read.
  4. Kafka Streams aren’t transactional by default. Ideally from when the current value of a truck weight is read from the KTable in one thread, until when the same thread updates the value, we don’t want another thread to be able to change the value (or using optimistic concurrency, if it is changed we want to throw an exception).

Since Kafka 0.11 there is support for transactions using a new Transactional API, and this link explains the motivation from a streams perspective:

The transactional producer allows an application to send messages to multiple partitions (and topics!) atomically.

The best explanation I’ve found of Kafka transactions so far is in this book (ebook available, Chapter 12: Transactions, but Not as We Know Them), which says that if you enable transactions in Kafka Streams then you get atomic operations for free.

How do you turn transactions on for a streams application?  There’s a single configuration parameter for delivery semantics and transactions. If the property processing.guarantee is set to “exactly_once” you get transactions (default is at_least_once). In the streams code you set the StreamsConfig processing guarantee like this:

Properties config = new Properties();

config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");

This change successfully prevented truck weights from going negative (although it also changes the commit_interval to 100ms making it less likely to get stale data from the KTable cache). There may also be a small performance hit.

Further Resources

The web tool for visualizing Kafka Streams topologies is here. Note that the direction of the arrows is ambiguous for state stores. It would also be nice to have type information for keys/value on the diagram, and the ability to have a simplified or detailed view.