In this blog, we’ll look at some simple Apache Kafka Streams examples using the murder mystery game Cluedo as a simple problem domain.
Dr Black has been murdered in the Billiard Room with a Candlestick! Whodunnit?!
There are six suspects and a mansion with multiple rooms.
The suspects are:
- Miss Scarlet
- Professor Plum
- Mrs Peacock
- Mr Green
- Colonel Mustard
- Mrs White
The rooms are:
- Kitchen
- Conservatory
- Dining Room
- Billiard Room
- Library
- Lounge
Assuming everyone has a means and a motive for murder, we’ll focus on who has an alibi as they move around the mansion.
Example 1: Who’s in the Same Location at the Same Time?
So you can imagine what the rooms look like, here’s the Cluedo mansion in The Sims.
The input stream will be <room, person> records giving the new location of a person. We’ll send them using the Kafka-console-producer program as follows:
./kafka-console-producer.sh –broker-list localhost:9092 –topic Cluedo-topic –property “parse.key=true” –property “key.separator=:”
>room1:person1
>room2:person2
From this event stream let’s find out which people are in the same room as someone else, which gives everyone in a room with at least one other person an alibi (unless they all did it). This example illustrates Kafka streams configuration properties, topology building, reading from a topic, a windowed (self) streams join, a filter, and print (for tracing). The self join will find all pairs of people who are in the same location at the “same time”, in a 30s sliding window in this case. However, we don’t want anyone providing themselves an alibi so if the leftValue and rightValue names match from the join we produce a “” joined value and filter it out. Here’s the initial streams code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cluedo-application"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); // input topic is <room, person> // as people move around they change room locations KStream<String, String> roomPersonStream = builder.stream("cluedo-topic"); KStream<String, String> sameRoomsStream = roomPersonStream .join(roomPersonStream, (leftValue, rightValue) -> leftValue.equals(rightValue) ? "" : leftValue + " in room with " + rightValue, // Sliding window duration JoinWindows.of(TimeUnit.SECONDS.toMillis(30)), // optional, default uses Serdes from config Joined.with( Serdes.String(), // key Serdes.String(), // left value Serdes.String()) // right value ) // only keep join pairs which are different people .filter((key, value) -> (value.length() >= 1)); roomPersonStream.print(); sameRoomsStream.print(); KafkaStreams streams = new KafkaStreams(builder.build(), config); streams.start(); // do stuff, and then eventually... streams.close(); |
Let’s try it out. These are the initial locations of the suspects:
1 2 3 4 5 6 |
>Kitchen:MissScarlet >Conservatory:ProfessorPlum >DiningRoom:MrsPeacock >BilliardRoom:MrGreen >Library:ColonelMustard >Lounge:MrsWhite |
After 30 seconds has elapsed (the window duration) let’s move people around to these locations:
1 2 3 4 5 6 7 |
>Conservatory:ProfessorPlum >Conservatory:MrsPeacock >Conservatory:MrGreen >Library:ColonelMustard >Library:MissScarlet >Kitchen:MrsWhite >Lounge:MrsWhite |
We get the following printed out showing all the pairs of people in the same locations. They all appear to have alibis, except Mrs White who can move around to other empty rooms without anyone else knowing. Note that the order of printing events isn’t always strictly preserved:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
// Initial <room, people> locations [KSTREAM-SOURCE-0000000000]: Kitchen, MissScarlet [KSTREAM-SOURCE-0000000000]: Conservatory, ProfessorPlum [KSTREAM-SOURCE-0000000000]: DiningRoom, MrsPeacock [KSTREAM-SOURCE-0000000000]: BilliardRoom, MrGreen [KSTREAM-SOURCE-0000000000]: Library, ColonelMustard [KSTREAM-SOURCE-0000000000]: Lounge, MrsWhite // after 30s some people move locations (SOURCE) // people in same room are printed (FILTER) [KSTREAM-SOURCE-0000000000]: Conservatory, ProfessorPlum [KSTREAM-FILTER-0000000006]: Conservatory, MrsPeacock in room with ProfessorPlum [KSTREAM-FILTER-0000000006]: Conservatory, ProfessorPlum in room with MrsPeacock [KSTREAM-SOURCE-0000000000]: Conservatory, MrsPeacock [KSTREAM-FILTER-0000000006]: Conservatory, MrGreen in room with ProfessorPlum [KSTREAM-FILTER-0000000006]: Conservatory, MrGreen in room with MrsPeacock [KSTREAM-FILTER-0000000006]: Conservatory, ProfessorPlum in room with MrGreen [KSTREAM-FILTER-0000000006]: Conservatory, MrsPeacock in room with MrGreen [KSTREAM-SOURCE-0000000000]: Conservatory, MrGreen [KSTREAM-SOURCE-0000000000]: Library, ColonelMustard [KSTREAM-FILTER-0000000006]: Library, MissScarlet in room with ColonelMustard [KSTREAM-FILTER-0000000006]: Library, ColonelMustard in room with MissScarlet [KSTREAM-SOURCE-0000000000]: Library, MissScarlet // MrsWhite can move between empty rooms without an alibi: [KSTREAM-SOURCE-0000000000]: Kitchen, MrsWhite [KSTREAM-SOURCE-0000000000]: Lounge, MrsWhite |
Note that no one has an alibi at the start as everyone is in different rooms.
Example 2: How Many People Are in Each Room?
Let’s try an extension of example 1 with a KTable keeping track of some state. We’ll count the number of people in each room as they move around. If someone is in a room by themselves then they don’t have an alibi and are a prime suspect! How do we get from the <room, person> stream to a <room, count> table?
The final count table will have <room, count> data, but in order to count the number of people per room, we need to know which rooms people are in. This is also state information so we need an intermediate table, created from the input topic, with <person, room> state data. Once we have this we group the reverse key/value pair, <room, person> and count the number of people per room. However, the input topic has <room, person> records so we need to reverse these. One way to do this is to read the topic into a stream, swap the key/value pairs, and write back to a topic. We can’t directly turn the stream to a table, as there’s no operator to transform a KStream to a KTable (see the table in previous blog).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
// reverse key/values for cluedo-topic and send back to reverse-cluedo-topic for use below KStream<String, String> personRoomStream = roomPersonStream .map( (key, value) -> KeyValue.pair(value, key) ); personRoomStream.to("reverse-cluedo-topic"); personRoomStream.print(); // create table with <person, room> updates // this table maintains current location of each person KTable<String, String> peopleRoomTable = builder.table("reverse-cluedo-topic"); // Keep track of how many people in each room // <room, count> KTable<String, Long> roomCountTable = // <person, room> peopleRoomTable // group by <room, person> so we can count how many people per room next .groupBy((key, value) -> KeyValue.pair(value, key)) // now count and produce <room, count> .count(); peopleRoomTable.print(); roomCountTable.print(); |
Here’s the output with the same input as before:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
// Initial person room locations (SOURCE) and reverse key/value pairs (MAP): [KSTREAM-SOURCE-0000000000]: Kitchen, MissScarlet [KSTREAM-MAP-0000000009]: MissScarlet, Kitchen [KSTREAM-SOURCE-0000000000]: Conservatory, ProfessorPlum [KSTREAM-MAP-0000000009]: ProfessorPlum, Conservatory [KSTREAM-SOURCE-0000000000]: DiningRoom, MrsPeacock [KSTREAM-MAP-0000000009]: MrsPeacock, DiningRoom [KSTREAM-SOURCE-0000000000]: BilliardRoom, MrGreen [KSTREAM-MAP-0000000009]: MrGreen, BilliardRoom [KSTREAM-SOURCE-0000000000]: Library, ColonelMustard [KSTREAM-MAP-0000000009]: ColonelMustard, Library [KSTREAM-SOURCE-0000000000]: Lounge, MrsWhite [KSTREAM-MAP-0000000009]: MrsWhite, Lounge // state table for <person, room> updates, initial locations set [KTABLE-SOURCE-0000000014]: MissScarlet, (Kitchen<-null) [KTABLE-SOURCE-0000000014]: ProfessorPlum, (Conservatory<-null) [KTABLE-SOURCE-0000000014]: MrsPeacock, (DiningRoom<-null) [KTABLE-SOURCE-0000000014]: MrGreen, (BilliardRoom<-null) [KTABLE-SOURCE-0000000014]: ColonelMustard, (Library<-null) [KTABLE-SOURCE-0000000014]: MrsWhite, (Lounge<-null) // People move around (SOURCE and MAP for reverse) // People in same room as someone else printed (FILTER) [KSTREAM-SOURCE-0000000000]: Conservatory, ProfessorPlum [KSTREAM-MAP-0000000009]: ProfessorPlum, Conservatory [KSTREAM-FILTER-0000000006]: Conservatory, MrsPeacock in room with ProfessorPlum [KSTREAM-FILTER-0000000006]: Conservatory, ProfessorPlum in room with MrsPeacock [KSTREAM-SOURCE-0000000000]: Conservatory, MrsPeacock [KSTREAM-MAP-0000000009]: MrsPeacock, Conservatory [KSTREAM-FILTER-0000000006]: Conservatory, MrGreen in room with ProfessorPlum [KSTREAM-FILTER-0000000006]: Conservatory, MrGreen in room with MrsPeacock [KSTREAM-FILTER-0000000006]: Conservatory, ProfessorPlum in room with MrGreen [KSTREAM-FILTER-0000000006]: Conservatory, MrsPeacock in room with MrGreen [KSTREAM-SOURCE-0000000000]: Conservatory, MrGreen [KSTREAM-MAP-0000000009]: MrGreen, Conservatory [KSTREAM-SOURCE-0000000000]: Library, ColonelMustard [KSTREAM-MAP-0000000009]: ColonelMustard, Library [KSTREAM-FILTER-0000000006]: Library, MissScarlet in room with ColonelMustard [KSTREAM-FILTER-0000000006]: Library, ColonelMustard in room with MissScarlet [KSTREAM-SOURCE-0000000000]: Library, MissScarlet [KSTREAM-MAP-0000000009]: MissScarlet, Library [KSTREAM-SOURCE-0000000000]: Kitchen, MrsWhite [KSTREAM-MAP-0000000009]: MrsWhite, Kitchen // count table: Each room has 1 person which was correct at start but is now lagging [KTABLE-AGGREGATE-0000000019]: Kitchen, (1<-null) [KTABLE-AGGREGATE-0000000019]: Conservatory, (1<-null) [KTABLE-AGGREGATE-0000000019]: DiningRoom, (1<-null) [KTABLE-AGGREGATE-0000000019]: BilliardRoom, (1<-null) [KTABLE-AGGREGATE-0000000019]: Library, (1<-null) [KTABLE-AGGREGATE-0000000019]: Lounge, (1<-null) // person location state table updated: [KTABLE-SOURCE-0000000014]: ProfessorPlum, (Conservatory<-Conservatory) [KTABLE-SOURCE-0000000014]: MrsPeacock, (Conservatory<-DiningRoom) [KTABLE-SOURCE-0000000014]: MrGreen, (Conservatory<-BilliardRoom) [KTABLE-SOURCE-0000000014]: ColonelMustard, (Library<-Library) [KTABLE-SOURCE-0000000014]: MissScarlet, (Library<-Kitchen) [KTABLE-SOURCE-0000000014]: MrsWhite, (Kitchen<-Lounge) [KSTREAM-SOURCE-0000000000]: Lounge, MrsWhite [KSTREAM-MAP-0000000009]: MrsWhite, Lounge // final room people counts: [KTABLE-AGGREGATE-0000000019]: DiningRoom, (0<-null) [KTABLE-AGGREGATE-0000000019]: BilliardRoom, (0<-null) [KTABLE-AGGREGATE-0000000019]: Conservatory, (3<-null) [KTABLE-AGGREGATE-0000000019]: Library, (2<-null) [KTABLE-AGGREGATE-0000000019]: Lounge, (0<-null) [KTABLE-AGGREGATE-0000000019]: Kitchen, (1<-null) [KTABLE-SOURCE-0000000014]: MrsWhite, (Lounge<-Kitchen) [KTABLE-AGGREGATE-0000000019]: Kitchen, (0<-null) [KTABLE-AGGREGATE-0000000019]: Lounge, (1<-null) |
Dr. Black should stick to the Conservatory or Library to avoid being bumped off.
Notice the delay with the KTable output. This is due to the default caching time for state stores (30s).
Example 3: Who Doesn’t Have an Alibi?
Everyone needs an alibi (Source: Getty Images)
It would be useful to make the people with no alibis explicit and create a stream of potential suspects. This example illustrates a stream-table join with a filter. We’ll join the <room, person> stream with the <room, count> table giving a <room, person, count> join, which we filter for people with no alibi:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
// Stream Table join example KStream<String, String> noAlibiMessage = // <room, person> stream roomPersonStream // <room, person> stream joined with <room, count> table .join( roomCountTable, (leftValue, rightValue) -> rightValue >= 1 ? "" : leftValue + " has no alibi" ) .filter((key, value) -> (value.length() >= 1)); noAlibiMessage.print(); |
How well did this work? Well, as noted above, no one has an initial alibi, so I changed things a bit to prevent spurious no alibi messages. Everyone now starts out in the Hall, and we filter out the Hall to prevent “no alibi” messages.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
// Everyone is cosy in the Hall, move to other rooms. [KSTREAM-SOURCE-0000000000]: Conservatory, ProfessorPlum [KSTREAM-MAP-0000000009]: ProfessorPlum, Conservatory [KSTREAM-FILTER-0000000024]: Conservatory, ProfessorPlum has no alibi [KSTREAM-SOURCE-0000000000]: Conservatory, MrsPeacock [KSTREAM-FILTER-0000000007]: Conservatory, MrsPeacock in room with ProfessorPlum [KSTREAM-FILTER-0000000007]: Conservatory, ProfessorPlum in room with MrsPeacock [KSTREAM-MAP-0000000009]: MrsPeacock, Conservatory [KSTREAM-FILTER-0000000024]: Conservatory, MrsPeacock has no alibi [KSTREAM-SOURCE-0000000000]: Conservatory, MrGreen [KSTREAM-FILTER-0000000007]: Conservatory, MrGreen in room with ProfessorPlum [KSTREAM-FILTER-0000000007]: Conservatory, MrGreen in room with MrsPeacock [KSTREAM-FILTER-0000000007]: Conservatory, ProfessorPlum in room with MrGreen [KSTREAM-FILTER-0000000007]: Conservatory, MrsPeacock in room with MrGreen [KSTREAM-MAP-0000000009]: MrGreen, Conservatory [KSTREAM-FILTER-0000000024]: Conservatory, MrGreen has no alibi [KSTREAM-SOURCE-0000000000]: Library, ColonelMustard [KSTREAM-MAP-0000000009]: ColonelMustard, Library [KSTREAM-FILTER-0000000024]: Library, ColonelMustard has no alibi [KSTREAM-SOURCE-0000000000]: Library, MissScarlet [KSTREAM-FILTER-0000000007]: Library, MissScarlet in room with ColonelMustard [KSTREAM-FILTER-0000000007]: Library, ColonelMustard in room with MissScarlet [KSTREAM-MAP-0000000009]: MissScarlet, Library [KSTREAM-FILTER-0000000024]: Library, MissScarlet has no alibi [KSTREAM-SOURCE-0000000000]: Kitchen, MrsWhite [KSTREAM-MAP-0000000009]: MrsWhite, Kitchen [KSTREAM-FILTER-0000000024]: Kitchen, MrsWhite has no alibi [KTABLE-SOURCE-0000000014]: ProfessorPlum, (Conservatory<-Hall) [KTABLE-SOURCE-0000000014]: MrsPeacock, (Conservatory<-Hall) [KTABLE-SOURCE-0000000014]: MrGreen, (Conservatory<-Hall) [KTABLE-SOURCE-0000000014]: ColonelMustard, (Library<-Hall) [KTABLE-SOURCE-0000000014]: MissScarlet, (Library<-Hall) [KTABLE-SOURCE-0000000014]: MrsWhite, (Kitchen<-Hall) [KTABLE-AGGREGATE-0000000019]: Hall, (5<-null) [KTABLE-AGGREGATE-0000000019]: Conservatory, (1<-null) [KTABLE-AGGREGATE-0000000019]: Hall, (4<-null) [KTABLE-AGGREGATE-0000000019]: Conservatory, (2<-null) [KTABLE-AGGREGATE-0000000019]: Hall, (3<-null) [KTABLE-AGGREGATE-0000000019]: Conservatory, (3<-null) [KTABLE-AGGREGATE-0000000019]: Hall, (2<-null) [KTABLE-AGGREGATE-0000000019]: Library, (1<-null) [KTABLE-AGGREGATE-0000000019]: Hall, (1<-null) [KTABLE-AGGREGATE-0000000019]: Library, (2<-null) [KTABLE-AGGREGATE-0000000019]: Hall, (0<-null) [KTABLE-AGGREGATE-0000000019]: Kitchen, (1<-null) |
According to this version, everyone is under suspicion as they don’t have an alibi. Why? It appears that the KTables are not being updated until after the “no alibi” messages are produced. I guess this behaviour is due to the state store caching, so I turned off the state store cache to see what happens. The properties setting is:
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
// Everyone starts out in the Hall and then moves to other rooms incrementally. [KSTREAM-SOURCE-0000000000]: Conservatory, ProfessorPlum [KSTREAM-MAP-0000000009]: ProfessorPlum, Conservatory // ProfessorPlum has no Alibi as he moved into the empty Conservatory [KSTREAM-FILTER-0000000024]: Conservatory, ProfessorPlum has no alibi [KTABLE-SOURCE-0000000014]: ProfessorPlum, (Conservatory<-Hall) [KTABLE-AGGREGATE-0000000019]: Hall, (5<-null) [KTABLE-AGGREGATE-0000000019]: Conservatory, (1<-null) [KSTREAM-SOURCE-0000000000]: Conservatory, MrsPeacock [KSTREAM-FILTER-0000000007]: Conservatory, MrsPeacock in room with ProfessorPlum [KSTREAM-FILTER-0000000007]: Conservatory, ProfessorPlum in room with MrsPeacock [KSTREAM-MAP-0000000009]: MrsPeacock, Conservatory [KTABLE-SOURCE-0000000014]: MrsPeacock, (Conservatory<-Hall) [KTABLE-AGGREGATE-0000000019]: Hall, (4<-null) [KTABLE-AGGREGATE-0000000019]: Conservatory, (2<-null) [KSTREAM-SOURCE-0000000000]: Conservatory, MrGreen [KSTREAM-FILTER-0000000007]: Conservatory, MrGreen in room with ProfessorPlum [KSTREAM-FILTER-0000000007]: Conservatory, MrGreen in room with MrsPeacock [KSTREAM-FILTER-0000000007]: Conservatory, ProfessorPlum in room with MrGreen [KSTREAM-FILTER-0000000007]: Conservatory, MrsPeacock in room with MrGreen [KSTREAM-MAP-0000000009]: MrGreen, Conservatory [KTABLE-SOURCE-0000000014]: MrGreen, (Conservatory<-Hall) [KTABLE-AGGREGATE-0000000019]: Hall, (3<-null) [KTABLE-AGGREGATE-0000000019]: Conservatory, (3<-null) [KSTREAM-SOURCE-0000000000]: Library, ColonelMustard [KSTREAM-MAP-0000000009]: ColonelMustard, Library // ColonelMustard has no alibi as he moves into the empty Library [KSTREAM-FILTER-0000000024]: Library, ColonelMustard has no alibi [KTABLE-SOURCE-0000000014]: ColonelMustard, (Library<-Hall) [KTABLE-AGGREGATE-0000000019]: Hall, (2<-null) [KTABLE-AGGREGATE-0000000019]: Library, (1<-null) [KSTREAM-SOURCE-0000000000]: Library, MissScarlet [KSTREAM-FILTER-0000000007]: Library, MissScarlet in room with ColonelMustard [KSTREAM-FILTER-0000000007]: Library, ColonelMustard in room with MissScarlet [KSTREAM-MAP-0000000009]: MissScarlet, Library [KTABLE-SOURCE-0000000014]: MissScarlet, (Library<-Hall) [KTABLE-AGGREGATE-0000000019]: Hall, (1<-null) [KTABLE-AGGREGATE-0000000019]: Library, (2<-null) [KSTREAM-SOURCE-0000000000]: Kitchen, MrsWhite [KSTREAM-MAP-0000000009]: MrsWhite, Kitchen // MrsWhite has no alibi as first (and only) person in Kitchen [KSTREAM-FILTER-0000000024]: Kitchen, MrsWhite has no alibi [KTABLE-SOURCE-0000000014]: MrsWhite, (Kitchen<-Hall) [KTABLE-AGGREGATE-0000000019]: Hall, (0<-null) [KTABLE-AGGREGATE-0000000019]: Kitchen, (1<-null) |
ProfessorPlum, ColonelMustard, and MrsWhite have no alibis, so this is working better.
But still may not be “correct”. I.e. we are not checking if someone is the last person in a room after everyone has moved out, but how can we know that everything is event driven? We’d need to check when they move out to another room. But what if they don’t move to another room?
Topology
n dimensional space has complex topologies
(Source: https://en.wikipedia.org/wiki/N-dimensional_sequential_move_puzzle#)
I was curious to see what the Topology of the final example looked like. Here’s the code to print it out (simple) and the result (not so simple, but probably simpler than the above n-dimensional space puzzle):
final Topology top = builder.build();
System.out.println(top.describe());
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
Topology Sub-topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [cluedo-topic-0]) --> KSTREAM-WINDOWED-0000000002, KSTREAM-FILTER-0000000022, KSTREAM-MAP-0000000009, KSTREAM-WINDOWED-0000000003, KSTREAM-PRINTER-0000000001 Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-JOINTHIS-0000000004-store]) --> KSTREAM-JOINTHIS-0000000004 <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-WINDOWED-0000000003 (stores: [KSTREAM-JOINOTHER-0000000005-store]) --> KSTREAM-JOINOTHER-0000000005 <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-FILTER-0000000022 (stores: []) --> KSTREAM-JOIN-0000000023 <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-JOINOTHER-0000000005 (stores: [KSTREAM-JOINTHIS-0000000004-store]) --> KSTREAM-MERGE-0000000006 <-- KSTREAM-WINDOWED-0000000003 Processor: KSTREAM-JOINTHIS-0000000004 (stores: [KSTREAM-JOINOTHER-0000000005-store]) --> KSTREAM-MERGE-0000000006 <-- KSTREAM-WINDOWED-0000000002 Processor: KSTREAM-JOIN-0000000023 (stores: [KTABLE-AGGREGATE-STATE-STORE-0000000016]) --> KSTREAM-FILTER-0000000024 <-- KSTREAM-FILTER-0000000022 Processor: KSTREAM-MAP-0000000009 (stores: []) --> KSTREAM-PRINTER-0000000011, KSTREAM-SINK-0000000010 <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-MERGE-0000000006 (stores: []) --> KSTREAM-FILTER-0000000007 <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005 Source: KSTREAM-SOURCE-0000000018 (topics: [KTABLE-AGGREGATE-STATE-STORE-0000000016-repartition]) --> KTABLE-AGGREGATE-0000000019 Processor: KSTREAM-FILTER-0000000007 (stores: []) --> KSTREAM-PRINTER-0000000008 <-- KSTREAM-MERGE-0000000006 Processor: KSTREAM-FILTER-0000000024 (stores: []) --> KSTREAM-PRINTER-0000000025 <-- KSTREAM-JOIN-0000000023 Processor: KTABLE-AGGREGATE-0000000019 (stores: [KTABLE-AGGREGATE-STATE-STORE-0000000016]) --> KSTREAM-PRINTER-0000000021 <-- KSTREAM-SOURCE-0000000018 Processor: KSTREAM-PRINTER-0000000001 (stores: []) --> none <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-PRINTER-0000000008 (stores: []) --> none <-- KSTREAM-FILTER-0000000007 Processor: KSTREAM-PRINTER-0000000011 (stores: []) --> none <-- KSTREAM-MAP-0000000009 Processor: KSTREAM-PRINTER-0000000021 (stores: []) --> none <-- KTABLE-AGGREGATE-0000000019 Processor: KSTREAM-PRINTER-0000000025 (stores: []) --> none <-- KSTREAM-FILTER-0000000024 Sink: KSTREAM-SINK-0000000010 (topic: reverse-cluedo-topic-0) <-- KSTREAM-MAP-0000000009 Sub-topology: 1 Source: KSTREAM-SOURCE-0000000013 (topics: [reverse-cluedo-topic-0]) --> KTABLE-SOURCE-0000000014 Processor: KTABLE-SOURCE-0000000014 (stores: [reverse-cluedo-topic-0STATE-STORE-0000000012]) --> KTABLE-SELECT-0000000015, KSTREAM-PRINTER-0000000020 <-- KSTREAM-SOURCE-0000000013 Processor: KTABLE-SELECT-0000000015 (stores: []) --> KSTREAM-SINK-0000000017 <-- KTABLE-SOURCE-0000000014 Processor: KSTREAM-PRINTER-0000000020 (stores: []) --> none <-- KTABLE-SOURCE-0000000014 Sink: KSTREAM-SINK-0000000017 (topic: KTABLE-AGGREGATE-STATE-STORE-0000000016-repartition) <-- KTABLE-SELECT-0000000015 |
Things to Watch Out For (Lucky they are only tiny)
(Source: Shutterstock)
While playing around with Apache Kafka Streams I made a list of things that caught me out. Here they are:
- Your streams application needs the latest RocksDB jar file (look for error “java.lang.UnsupportedOperationException”)
- When debugging/testing streams applications, ensure you don’t have any other instances of them running (with the same APPLICATION_ID_CONFIG, which I guess is the consumer group name) that you’ve forgotten about. It’s best to get the code to close() the application after it’s finished each time.
- If you transform a KTable to a KStream to a Kafka topic, ensure that the sink topic is compacted so that it doesn’t grow indefinitely.
- You can’t explicitly access the record timestamp from the DSL, so you can’t do things like comparing the order of events. This seems like an odd constraint, but you can access the timestamp from the Processor API. You could cheat by putting a copy of the timestamp in the record value.
- When testing/debugging remember that Kafka remembers everything! You may need to reset both the state stores and the streams.
- There’s 2 things to reset, streams and state. You can use cleanUp() in your code to reset state stores, and the reset tool to reset streams (however, this may not be the behaviour you want). For the Cluedo application, I wanted to reset the state stores but not the streams (as I didn’t want the streams data replayed each time).
- A useful trick was to use input data to put the state stores back to the start state that I wanted each time. I did this by introducing an extra room, a Train, and moving everyone to and from the Train at the start and end of each test scenario.
- Here’s documentation on resetting the local environment, and a more comprehensive explanation of resetting in the context of reprocessing.
- print() is your friend. Print everything, but remember it’s terminal (just like a candlestick is if used as a blunt instrument).
- Only put the DSL code inline with the DSL code. I.e. don’t put any non-DSL code inline with DSL code (or something bad may happen).
- Changing and reversing keys and values is tricky, but appears to be normal. How do know which operators do this? E.g. Join can only change the value not the key.
- There’s a trade-off between using KStreams and KTables. KStreams are immediate (i.e. an incoming event immediately triggers processing), but some operations require the use of time windows to limit the amount of data included. KTables keep track of state, which can be from an arbitrarily long series of events/time, but if caching is turned on then the results may not be immediate. I.e. use KStreams (where possible) if you care about low latency and using fresh data, and KTables when you don’t mind if the results are a bit fuzzy.
- It’s useful to have some knowledge of Java 8 Streams, Lambdas, and Functional Programming to use get the most out of Apache Kafka Streams DSL (see resources below).
Finally, Apache Kafka Streams DSL is highly extensible and composable. It should be a good choice for application development in complex and rapidly moving environments as it enables loosely coupled code to be written which:
- doesn’t need to know about other applications that may also be using the same data, and
- can build new functionality upon new data/topics as they come into being.
Some Apache Kafka Streams Examples and Resources
Apache Kafka Streams Documentation
Data Stream Management with Apache Kafka Streams (good overview of Kafka streams deployment)
Simple Spatio-Temporal Windowing with Kafka Streams with code example
Apache Kafka DSL API documentation
Kafka Streams DSL vs Processor API vs KSQL
Crossing the streams – Joins in Apache Kafka
Java 8 Concepts: FP, Streams, and Lambda Expressions (useful background to the Java 8 syntax of Streams and Lambdas used in Kafka Streams), and Java 8 in Action: Chapter 4. Introducing streams (free chapter).
Data Reprocessing with the Streams API in Kafka: Resetting a Streams Application
Geomesa (an open source large-scale geospatial tool) integrates with Kafka and would enable more powerful Open Geospatial Consortium (OGC) Spatio-temporal queries.
Spark streaming vs Kafka Streams (note may be better comparisons this is just random)
Describe and execute Kafka stream topologies with Lenses SQL (in a previous blog we used the Landoop Kafka Cassandra connector, they also have an open source GUI for Kafka streams).